rohdesamuel commented on code in PR #17402:
URL: https://github.com/apache/beam/pull/17402#discussion_r864286373


##########
sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py:
##########
@@ -40,45 +40,25 @@ class UnimportedDataproc:
 
 _LOGGER = logging.getLogger(__name__)
 
+DATAPROC_STAGING_LOG_NAME = 'dataproc-startup-script_output'

Review Comment:
   Can you add a comment for what this constant is used for?



##########
sdks/python/apache_beam/runners/interactive/interactive_beam.py:
##########
@@ -339,151 +342,229 @@ def record(self, pipeline):
 
 
 class Clusters:
-  """An interface for users to modify the pipelines that are being run by the
-  Interactive Environment.
+  """An interface to manage clusters running workers that are connected with
+  the current interactive environment.
 
-  Methods of the Interactive Beam Clusters class can be accessed via:
-    from apache_beam.runners.interactive import interactive_beam as ib
-    ib.clusters
+  This module is experimental. No backwards-compatibility guarantees.
 
-  Example of calling the Interactive Beam clusters describe method::
-    ib.clusters.describe()
+  Interactive Beam automatically creates/reuses existing worker clusters to
+  execute pipelines when it detects the need from configurations.
+  Currently, the only supported cluster implementation is Flink running on
+  Cloud Dataproc.
+
+  To configure a pipeline to run on Cloud Dataproc with Flink, set the
+  underlying runner of the InteractiveRunner to FlinkRunner and the pipeline
+  options to indicate where on Cloud the FlinkRunner should be deployed to.
+
+    An example to enable automatic Dataproc cluster creation/reuse::
+
+      options = PipelineOptions([
+          '--project=my-project',
+          '--region=my-region',
+          '--environment_type=DOCKER'])
+      pipeline = beam.Pipeline(InteractiveRunner(
+          underlying_runner=FlinkRunner()), options=options)
+
+  Reuse a pipeline options in another pipeline would configure Interactive Beam
+  to reuse the same Dataproc cluster implicitly managed by the current
+  interactive environment.
+  If a flink_master is identified as a known cluster, the corresponding cluster
+  is also resued.
+  Furthermore, if a cluster is explicitly created by using a pipeline as an
+  identifier to a known cluster, the cluster is reused.
+
+    An example::
+
+      # dcm == ib.clusters.pipelines.get(pipeline), no cluster is newly 
created.

Review Comment:
   Feels a little weird to access a member in a class to get the cluster 
manager. Consider changing this into a method like `ib.clusters.get(pipeline)`. 
This flows a bit better.



##########
sdks/python/apache_beam/runners/interactive/interactive_beam.py:
##########
@@ -339,151 +342,229 @@ def record(self, pipeline):
 
 
 class Clusters:
-  """An interface for users to modify the pipelines that are being run by the
-  Interactive Environment.
+  """An interface to manage clusters running workers that are connected with
+  the current interactive environment.
 
-  Methods of the Interactive Beam Clusters class can be accessed via:
-    from apache_beam.runners.interactive import interactive_beam as ib
-    ib.clusters
+  This module is experimental. No backwards-compatibility guarantees.
 
-  Example of calling the Interactive Beam clusters describe method::
-    ib.clusters.describe()
+  Interactive Beam automatically creates/reuses existing worker clusters to
+  execute pipelines when it detects the need from configurations.
+  Currently, the only supported cluster implementation is Flink running on
+  Cloud Dataproc.
+
+  To configure a pipeline to run on Cloud Dataproc with Flink, set the
+  underlying runner of the InteractiveRunner to FlinkRunner and the pipeline
+  options to indicate where on Cloud the FlinkRunner should be deployed to.
+
+    An example to enable automatic Dataproc cluster creation/reuse::
+
+      options = PipelineOptions([
+          '--project=my-project',
+          '--region=my-region',
+          '--environment_type=DOCKER'])
+      pipeline = beam.Pipeline(InteractiveRunner(
+          underlying_runner=FlinkRunner()), options=options)
+
+  Reuse a pipeline options in another pipeline would configure Interactive Beam
+  to reuse the same Dataproc cluster implicitly managed by the current
+  interactive environment.
+  If a flink_master is identified as a known cluster, the corresponding cluster
+  is also resued.
+  Furthermore, if a cluster is explicitly created by using a pipeline as an
+  identifier to a known cluster, the cluster is reused.
+
+    An example::
+
+      # dcm == ib.clusters.pipelines.get(pipeline), no cluster is newly 
created.
+      dcm = ib.clusters.create(pipeline)
+
+  To configure a pipeline to run on an existing FlinkRunner deployed elsewhere,
+  set the flink_master explicitly so no cluster will be created/reused.
+
+    An example pipeline options to skip automatic Dataproc cluster usage::
+
+      options = PipelineOptions([
+          '--flink_master=some.self.hosted.flink:port',
+          '--environment_type=DOCKER'])
+
+  To configure a pipeline to run on a local FlinkRunner, explicitly set the
+  default cluster metadata to None: ib.clusters.set_default_cluster(None).
   """
   # Explicitly set the Flink version here to ensure compatibility with 2.0
   # Dataproc images:
   # 
https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-release-2.0
   DATAPROC_FLINK_VERSION = '1.12'
+
   # TODO(BEAM-14142): Fix the Dataproc image version after a released image
   # contains all missing dependencies for Flink to run.
   # DATAPROC_IMAGE_VERSION = '2.0.XX-debian10'
-  DATAPROC_STAGING_LOG_NAME = 'dataproc-startup-script_output'
 
   def __init__(self) -> None:
-    """Instantiates default values for Dataproc cluster interactions.
-    """
-    # Set the default_cluster_name that will be used when creating Dataproc
-    # clusters.
-    self.default_cluster_name = 'interactive-beam-cluster'
-    # Bidirectional 1-1 mapping between master_urls (str) to cluster metadata
-    # (MasterURLIdentifier), where self.master_urls.inverse is a mapping from
-    # MasterURLIdentifier -> str.
-    self.master_urls = bidict()
-    # self.dataproc_cluster_managers map string pipeline ids to instances of
-    # DataprocClusterManager.
-    self.dataproc_cluster_managers = {}
-    # self.master_urls_to_pipelines map string master_urls to lists of
-    # pipelines that use the corresponding master_url.
-    self.master_urls_to_pipelines: DefaultDict[
-        str, List[beam.Pipeline]] = defaultdict(list)
-    # self.master_urls_to_dashboards map string master_urls to the
-    # corresponding Apache Flink dashboards.
-    self.master_urls_to_dashboards: Dict[str, str] = {}
-    # self.default_cluster_metadata for creating a DataprocClusterManager when
-    # a pipeline has its cluster deleted from the clusters Jupyterlab
-    # extension.
-    self.default_cluster_metadata = None
-
-  def describe(self, pipeline: Optional[beam.Pipeline] = None) -> dict:
-    """Returns a description of the cluster associated to the given pipeline.
-
-    If no pipeline is given then this returns a dictionary of descriptions for
-    all pipelines, mapped to by id.
+    self.dataproc_cluster_managers: Dict[ClusterMetadata,
+                                         DataprocClusterManager] = {}
+    self.master_urls: Dict[str, ClusterMetadata] = {}
+    self.pipelines: Dict[beam.Pipeline, DataprocClusterManager] = {}
+    self.default_cluster_metadata: Optional[ClusterMetadata] = None
+
+  def create(
+      self, cluster_identifier: ClusterIdentifier) -> DataprocClusterManager:
+    """Creates a Dataproc cluster manager provisioned for the cluster
+    identified. If the cluster is known, returns an existing cluster manager.
     """
-    description = {
-        pid: dcm.describe()
-        for pid,
-        dcm in self.dataproc_cluster_managers.items()
-    }
-    if pipeline:
-      return description.get(str(id(pipeline)), None)
-    return description
+    # Try to get some not-None cluster metadata.
+    cluster_metadata = self.cluster_metadata(cluster_identifier)
+    if not cluster_metadata:
+      raise ValueError(
+          'Unknown cluster identifier: %s. Cannot create or reuse'
+          'a Dataproc cluster.')
+    elif cluster_metadata.region == 'global':
+      # The global region is unsupported as it will be eventually deprecated.
+      raise ValueError('Clusters in the global region are not supported.')
+    elif not cluster_metadata.region:
+      _LOGGER.info(
+          'No region information was detected, defaulting Dataproc cluster '
+          'region to: us-central1.')
+      cluster_metadata.region = 'us-central1'
+    # else use the provided region.
+    known_dcm = self.dataproc_cluster_managers.get(cluster_metadata, None)
+    if known_dcm:
+      return known_dcm
+    dcm = DataprocClusterManager(cluster_metadata)
+    dcm.create_flink_cluster()
+    # ClusterMetadata with derivative fields populated by the dcm.
+    derived_meta = dcm.cluster_metadata
+    self.dataproc_cluster_managers[derived_meta] = dcm
+    self.master_urls[derived_meta.master_url] = derived_meta
+    # Update the default cluster metadata to the one just created.
+    self.set_default_cluster(derived_meta)
+    return dcm
 
   def cleanup(
-      self, pipeline: Optional[beam.Pipeline] = None, force=False) -> None:
-    """Attempt to cleanup the Dataproc Cluster corresponding to the given
-    pipeline.
-
-    If the cluster is not managed by interactive_beam, a corresponding cluster
-    manager is not detected, and deletion is skipped.
-
-    For clusters managed by Interactive Beam, by default, deletion is skipped
-    if any other pipelines are using the cluster.
-
-    Optionally, the cleanup for a cluster managed by Interactive Beam can be
-    forced, by setting the 'force' parameter to True.
-
-    Example usage of default cleanup::
-      interactive_beam.clusters.cleanup(p)
-
-    Example usage of forceful cleanup::
-      interactive_beam.clusters.cleanup(p, force=True)
+      self, cluster_identifier: Optional[ClusterIdentifier] = None) -> None:
+    """Cleans up the cluster associated with the given cluster_identifier.
+
+    If None cluster_identifier is provided, cleans up for all clusters.
+    If a beam.Pipeline is given as the ClusterIdentifier while multiple
+    pipelines share the same cluster, it only cleans up the association between
+    the pipeline and the cluster identified.
+    If the cluster_identifier is unknown, NOOP.
     """
-    if pipeline:
-      cluster_manager = self.dataproc_cluster_managers.get(
-          str(id(pipeline)), None)
-      if cluster_manager:
-        master_url = cluster_manager.master_url
-        if len(self.master_urls_to_pipelines[master_url]) > 1:
-          if force:
-            _LOGGER.warning(
-                'Cluster is currently being used by another cluster, but '
-                'will be forcefully cleaned up.')
-            cluster_manager.cleanup()
-          else:
-            _LOGGER.warning(
-                'Cluster is currently being used, skipping deletion.')
-          self.master_urls_to_pipelines[master_url].remove(str(id(pipeline)))
-        else:
-          cluster_manager.cleanup()
-          self.master_urls.pop(master_url, None)
-          self.master_urls_to_pipelines.pop(master_url, None)
-          self.master_urls_to_dashboards.pop(master_url, None)
-          self.dataproc_cluster_managers.pop(str(id(pipeline)), None)
+    if not cluster_identifier:  # Cleans up everything.
+      for dcm in set(self.dataproc_cluster_managers.values()):
+        self._cleanup(dcm)
+      self.default_cluster_metadata = None

Review Comment:
   Feels dangerous to give users such an easy way to clean up all cluster 
managers at once. Maybe we should keep the `force` argument and only clean up 
everything if it's true?



##########
sdks/python/apache_beam/runners/interactive/interactive_beam.py:
##########
@@ -339,151 +342,229 @@ def record(self, pipeline):
 
 
 class Clusters:
-  """An interface for users to modify the pipelines that are being run by the
-  Interactive Environment.
+  """An interface to manage clusters running workers that are connected with

Review Comment:
   This file is starting to get pretty big. It doesn't have to be in this PR, 
but we may want to start investigating moving some of this logic to sub-modules.



##########
sdks/python/apache_beam/runners/interactive/interactive_runner.py:
##########
@@ -224,77 +218,45 @@ def visit_transform(self, transform_node):
 
     return main_job_result
 
-  # TODO(victorhc): Move this method somewhere else if performance is impacted
-  # by generating a cluster during runtime.
-  def _get_dataproc_cluster_master_url_if_applicable(
-      self, user_pipeline: beam.Pipeline) -> str:
-    """ Creates a Dataproc cluster if the provided user_pipeline is running
-    FlinkRunner and no flink_master_url was provided as an option. A cluster
-    is not created when a flink_master_url is detected.
-
-    Example pipeline options to enable automatic Dataproc cluster creation:
-      options = PipelineOptions([
-      '--runner=FlinkRunner',
-      '--project=my-project',
-      '--region=my-region',
-      '--environment_type=DOCKER'
-      ])
-
-    Example pipeline options to skip automatic Dataproc cluster creation:
-      options = PipelineOptions([
-      '--runner=FlinkRunner',
-      '--flink_master=example.internal:41979',
-      '--environment_type=DOCKER'
-      ])
+  def tune_for_flink(

Review Comment:
   "tune" generally means "optimize" in this context. But looking through the 
method, maybe "configure" is better?



##########
sdks/python/apache_beam/runners/interactive/testing/mock_env.py:
##########
@@ -0,0 +1,90 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module of mocks to isolated the test environment for each Interactive Beam
+test.
+"""
+
+import unittest
+import uuid
+from unittest.mock import patch
+
+from apache_beam.runners.interactive import interactive_environment as ie
+from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import 
DataprocClusterManager
+from apache_beam.runners.interactive.interactive_environment import 
InteractiveEnvironment
+from apache_beam.runners.interactive.testing.mock_ipython import 
mock_get_ipython
+
+
+def isolated_env(cls: unittest.TestCase):

Review Comment:
   Pretty cool, can we make this the default for tests and instead have a 
decorator to turn this off? Isolated and idempotent unit tests should be the 
default.



##########
sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py:
##########
@@ -191,72 +175,60 @@ def cleanup(self) -> None:
             'Failed to delete cluster: %s', self.cluster_metadata.cluster_name)
         raise e
 
-  def describe(self) -> None:
-    """Returns a dictionary describing the cluster."""
-    return {
-        'cluster_metadata': self.cluster_metadata,
-        'master_url': self.master_url,
-        'dashboard': self.dashboard
-    }
-
-  def get_cluster_details(
-      self, cluster_metadata: MasterURLIdentifier) -> dataproc_v1.Cluster:
+  def get_cluster_details(self) -> dataproc_v1.Cluster:
     """Gets the Dataproc_v1 Cluster object for the current cluster manager."""
     try:
       return self._cluster_client.get_cluster(
           request={
-              'project_id': cluster_metadata.project_id,
-              'region': cluster_metadata.region,
-              'cluster_name': cluster_metadata.cluster_name
+              'project_id': self.cluster_metadata.project_id,
+              'region': self.cluster_metadata.region,
+              'cluster_name': self.cluster_metadata.cluster_name
           })
     except Exception as e:
       if e.code == 403:
         _LOGGER.error(
             'Due to insufficient project permissions, '
             'unable to retrieve information for cluster: %s',
-            cluster_metadata.cluster_name)
+            self.cluster_metadata.cluster_name)
         raise ValueError(
             'You cannot view clusters in project: {}'.format(
-                cluster_metadata.project_id))
+                self.cluster_metadata.project_id))
       elif e.code == 404:
         _LOGGER.error(
-            'Cluster does not exist: %s', cluster_metadata.cluster_name)
+            'Cluster does not exist: %s', self.cluster_metadata.cluster_name)
         raise ValueError(
-            'Cluster was not found: {}'.format(cluster_metadata.cluster_name))
+            'Cluster was not found: {}'.format(
+                self.cluster_metadata.cluster_name))
       else:
         _LOGGER.error(
             'Failed to get information for cluster: %s',
-            cluster_metadata.cluster_name)
+            self.cluster_metadata.cluster_name)
         raise e
 
-  def wait_for_cluster_to_provision(
-      self, cluster_metadata: MasterURLIdentifier) -> None:
-    while self.get_cluster_details(
-        cluster_metadata).status.state.name == 'CREATING':
+  def wait_for_cluster_to_provision(self) -> None:
+    while self.get_cluster_details().status.state.name == 'CREATING':
       time.sleep(15)

Review Comment:
   Will this always exit this loop?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to