[ 
https://issues.apache.org/jira/browse/BEAM-14332?focusedWorklogId=765727&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-765727
 ]

ASF GitHub Bot logged work on BEAM-14332:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 03/May/22 22:30
            Start Date: 03/May/22 22:30
    Worklog Time Spent: 10m 
      Work Description: rohdesamuel commented on code in PR #17402:
URL: https://github.com/apache/beam/pull/17402#discussion_r864286373


##########
sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py:
##########
@@ -40,45 +40,25 @@ class UnimportedDataproc:
 
 _LOGGER = logging.getLogger(__name__)
 
+DATAPROC_STAGING_LOG_NAME = 'dataproc-startup-script_output'

Review Comment:
   Can you add a comment for what this constant is used for?



##########
sdks/python/apache_beam/runners/interactive/interactive_beam.py:
##########
@@ -339,151 +342,229 @@ def record(self, pipeline):
 
 
 class Clusters:
-  """An interface for users to modify the pipelines that are being run by the
-  Interactive Environment.
+  """An interface to manage clusters running workers that are connected with
+  the current interactive environment.
 
-  Methods of the Interactive Beam Clusters class can be accessed via:
-    from apache_beam.runners.interactive import interactive_beam as ib
-    ib.clusters
+  This module is experimental. No backwards-compatibility guarantees.
 
-  Example of calling the Interactive Beam clusters describe method::
-    ib.clusters.describe()
+  Interactive Beam automatically creates/reuses existing worker clusters to
+  execute pipelines when it detects the need from configurations.
+  Currently, the only supported cluster implementation is Flink running on
+  Cloud Dataproc.
+
+  To configure a pipeline to run on Cloud Dataproc with Flink, set the
+  underlying runner of the InteractiveRunner to FlinkRunner and the pipeline
+  options to indicate where on Cloud the FlinkRunner should be deployed to.
+
+    An example to enable automatic Dataproc cluster creation/reuse::
+
+      options = PipelineOptions([
+          '--project=my-project',
+          '--region=my-region',
+          '--environment_type=DOCKER'])
+      pipeline = beam.Pipeline(InteractiveRunner(
+          underlying_runner=FlinkRunner()), options=options)
+
+  Reuse a pipeline options in another pipeline would configure Interactive Beam
+  to reuse the same Dataproc cluster implicitly managed by the current
+  interactive environment.
+  If a flink_master is identified as a known cluster, the corresponding cluster
+  is also resued.
+  Furthermore, if a cluster is explicitly created by using a pipeline as an
+  identifier to a known cluster, the cluster is reused.
+
+    An example::
+
+      # dcm == ib.clusters.pipelines.get(pipeline), no cluster is newly 
created.

Review Comment:
   Feels a little weird to access a member in a class to get the cluster 
manager. Consider changing this into a method like `ib.clusters.get(pipeline)`. 
This flows a bit better.



##########
sdks/python/apache_beam/runners/interactive/interactive_beam.py:
##########
@@ -339,151 +342,229 @@ def record(self, pipeline):
 
 
 class Clusters:
-  """An interface for users to modify the pipelines that are being run by the
-  Interactive Environment.
+  """An interface to manage clusters running workers that are connected with
+  the current interactive environment.
 
-  Methods of the Interactive Beam Clusters class can be accessed via:
-    from apache_beam.runners.interactive import interactive_beam as ib
-    ib.clusters
+  This module is experimental. No backwards-compatibility guarantees.
 
-  Example of calling the Interactive Beam clusters describe method::
-    ib.clusters.describe()
+  Interactive Beam automatically creates/reuses existing worker clusters to
+  execute pipelines when it detects the need from configurations.
+  Currently, the only supported cluster implementation is Flink running on
+  Cloud Dataproc.
+
+  To configure a pipeline to run on Cloud Dataproc with Flink, set the
+  underlying runner of the InteractiveRunner to FlinkRunner and the pipeline
+  options to indicate where on Cloud the FlinkRunner should be deployed to.
+
+    An example to enable automatic Dataproc cluster creation/reuse::
+
+      options = PipelineOptions([
+          '--project=my-project',
+          '--region=my-region',
+          '--environment_type=DOCKER'])
+      pipeline = beam.Pipeline(InteractiveRunner(
+          underlying_runner=FlinkRunner()), options=options)
+
+  Reuse a pipeline options in another pipeline would configure Interactive Beam
+  to reuse the same Dataproc cluster implicitly managed by the current
+  interactive environment.
+  If a flink_master is identified as a known cluster, the corresponding cluster
+  is also resued.
+  Furthermore, if a cluster is explicitly created by using a pipeline as an
+  identifier to a known cluster, the cluster is reused.
+
+    An example::
+
+      # dcm == ib.clusters.pipelines.get(pipeline), no cluster is newly 
created.
+      dcm = ib.clusters.create(pipeline)
+
+  To configure a pipeline to run on an existing FlinkRunner deployed elsewhere,
+  set the flink_master explicitly so no cluster will be created/reused.
+
+    An example pipeline options to skip automatic Dataproc cluster usage::
+
+      options = PipelineOptions([
+          '--flink_master=some.self.hosted.flink:port',
+          '--environment_type=DOCKER'])
+
+  To configure a pipeline to run on a local FlinkRunner, explicitly set the
+  default cluster metadata to None: ib.clusters.set_default_cluster(None).
   """
   # Explicitly set the Flink version here to ensure compatibility with 2.0
   # Dataproc images:
   # 
https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-release-2.0
   DATAPROC_FLINK_VERSION = '1.12'
+
   # TODO(BEAM-14142): Fix the Dataproc image version after a released image
   # contains all missing dependencies for Flink to run.
   # DATAPROC_IMAGE_VERSION = '2.0.XX-debian10'
-  DATAPROC_STAGING_LOG_NAME = 'dataproc-startup-script_output'
 
   def __init__(self) -> None:
-    """Instantiates default values for Dataproc cluster interactions.
-    """
-    # Set the default_cluster_name that will be used when creating Dataproc
-    # clusters.
-    self.default_cluster_name = 'interactive-beam-cluster'
-    # Bidirectional 1-1 mapping between master_urls (str) to cluster metadata
-    # (MasterURLIdentifier), where self.master_urls.inverse is a mapping from
-    # MasterURLIdentifier -> str.
-    self.master_urls = bidict()
-    # self.dataproc_cluster_managers map string pipeline ids to instances of
-    # DataprocClusterManager.
-    self.dataproc_cluster_managers = {}
-    # self.master_urls_to_pipelines map string master_urls to lists of
-    # pipelines that use the corresponding master_url.
-    self.master_urls_to_pipelines: DefaultDict[
-        str, List[beam.Pipeline]] = defaultdict(list)
-    # self.master_urls_to_dashboards map string master_urls to the
-    # corresponding Apache Flink dashboards.
-    self.master_urls_to_dashboards: Dict[str, str] = {}
-    # self.default_cluster_metadata for creating a DataprocClusterManager when
-    # a pipeline has its cluster deleted from the clusters Jupyterlab
-    # extension.
-    self.default_cluster_metadata = None
-
-  def describe(self, pipeline: Optional[beam.Pipeline] = None) -> dict:
-    """Returns a description of the cluster associated to the given pipeline.
-
-    If no pipeline is given then this returns a dictionary of descriptions for
-    all pipelines, mapped to by id.
+    self.dataproc_cluster_managers: Dict[ClusterMetadata,
+                                         DataprocClusterManager] = {}
+    self.master_urls: Dict[str, ClusterMetadata] = {}
+    self.pipelines: Dict[beam.Pipeline, DataprocClusterManager] = {}
+    self.default_cluster_metadata: Optional[ClusterMetadata] = None
+
+  def create(
+      self, cluster_identifier: ClusterIdentifier) -> DataprocClusterManager:
+    """Creates a Dataproc cluster manager provisioned for the cluster
+    identified. If the cluster is known, returns an existing cluster manager.
     """
-    description = {
-        pid: dcm.describe()
-        for pid,
-        dcm in self.dataproc_cluster_managers.items()
-    }
-    if pipeline:
-      return description.get(str(id(pipeline)), None)
-    return description
+    # Try to get some not-None cluster metadata.
+    cluster_metadata = self.cluster_metadata(cluster_identifier)
+    if not cluster_metadata:
+      raise ValueError(
+          'Unknown cluster identifier: %s. Cannot create or reuse'
+          'a Dataproc cluster.')
+    elif cluster_metadata.region == 'global':
+      # The global region is unsupported as it will be eventually deprecated.
+      raise ValueError('Clusters in the global region are not supported.')
+    elif not cluster_metadata.region:
+      _LOGGER.info(
+          'No region information was detected, defaulting Dataproc cluster '
+          'region to: us-central1.')
+      cluster_metadata.region = 'us-central1'
+    # else use the provided region.
+    known_dcm = self.dataproc_cluster_managers.get(cluster_metadata, None)
+    if known_dcm:
+      return known_dcm
+    dcm = DataprocClusterManager(cluster_metadata)
+    dcm.create_flink_cluster()
+    # ClusterMetadata with derivative fields populated by the dcm.
+    derived_meta = dcm.cluster_metadata
+    self.dataproc_cluster_managers[derived_meta] = dcm
+    self.master_urls[derived_meta.master_url] = derived_meta
+    # Update the default cluster metadata to the one just created.
+    self.set_default_cluster(derived_meta)
+    return dcm
 
   def cleanup(
-      self, pipeline: Optional[beam.Pipeline] = None, force=False) -> None:
-    """Attempt to cleanup the Dataproc Cluster corresponding to the given
-    pipeline.
-
-    If the cluster is not managed by interactive_beam, a corresponding cluster
-    manager is not detected, and deletion is skipped.
-
-    For clusters managed by Interactive Beam, by default, deletion is skipped
-    if any other pipelines are using the cluster.
-
-    Optionally, the cleanup for a cluster managed by Interactive Beam can be
-    forced, by setting the 'force' parameter to True.
-
-    Example usage of default cleanup::
-      interactive_beam.clusters.cleanup(p)
-
-    Example usage of forceful cleanup::
-      interactive_beam.clusters.cleanup(p, force=True)
+      self, cluster_identifier: Optional[ClusterIdentifier] = None) -> None:
+    """Cleans up the cluster associated with the given cluster_identifier.
+
+    If None cluster_identifier is provided, cleans up for all clusters.
+    If a beam.Pipeline is given as the ClusterIdentifier while multiple
+    pipelines share the same cluster, it only cleans up the association between
+    the pipeline and the cluster identified.
+    If the cluster_identifier is unknown, NOOP.
     """
-    if pipeline:
-      cluster_manager = self.dataproc_cluster_managers.get(
-          str(id(pipeline)), None)
-      if cluster_manager:
-        master_url = cluster_manager.master_url
-        if len(self.master_urls_to_pipelines[master_url]) > 1:
-          if force:
-            _LOGGER.warning(
-                'Cluster is currently being used by another cluster, but '
-                'will be forcefully cleaned up.')
-            cluster_manager.cleanup()
-          else:
-            _LOGGER.warning(
-                'Cluster is currently being used, skipping deletion.')
-          self.master_urls_to_pipelines[master_url].remove(str(id(pipeline)))
-        else:
-          cluster_manager.cleanup()
-          self.master_urls.pop(master_url, None)
-          self.master_urls_to_pipelines.pop(master_url, None)
-          self.master_urls_to_dashboards.pop(master_url, None)
-          self.dataproc_cluster_managers.pop(str(id(pipeline)), None)
+    if not cluster_identifier:  # Cleans up everything.
+      for dcm in set(self.dataproc_cluster_managers.values()):
+        self._cleanup(dcm)
+      self.default_cluster_metadata = None

Review Comment:
   Feels dangerous to give users such an easy way to clean up all cluster 
managers at once. Maybe we should keep the `force` argument and only clean up 
everything if it's true?



##########
sdks/python/apache_beam/runners/interactive/interactive_beam.py:
##########
@@ -339,151 +342,229 @@ def record(self, pipeline):
 
 
 class Clusters:
-  """An interface for users to modify the pipelines that are being run by the
-  Interactive Environment.
+  """An interface to manage clusters running workers that are connected with

Review Comment:
   This file is starting to get pretty big. It doesn't have to be in this PR, 
but we may want to start investigating moving some of this logic to sub-modules.



##########
sdks/python/apache_beam/runners/interactive/interactive_runner.py:
##########
@@ -224,77 +218,45 @@ def visit_transform(self, transform_node):
 
     return main_job_result
 
-  # TODO(victorhc): Move this method somewhere else if performance is impacted
-  # by generating a cluster during runtime.
-  def _get_dataproc_cluster_master_url_if_applicable(
-      self, user_pipeline: beam.Pipeline) -> str:
-    """ Creates a Dataproc cluster if the provided user_pipeline is running
-    FlinkRunner and no flink_master_url was provided as an option. A cluster
-    is not created when a flink_master_url is detected.
-
-    Example pipeline options to enable automatic Dataproc cluster creation:
-      options = PipelineOptions([
-      '--runner=FlinkRunner',
-      '--project=my-project',
-      '--region=my-region',
-      '--environment_type=DOCKER'
-      ])
-
-    Example pipeline options to skip automatic Dataproc cluster creation:
-      options = PipelineOptions([
-      '--runner=FlinkRunner',
-      '--flink_master=example.internal:41979',
-      '--environment_type=DOCKER'
-      ])
+  def tune_for_flink(

Review Comment:
   "tune" generally means "optimize" in this context. But looking through the 
method, maybe "configure" is better?



##########
sdks/python/apache_beam/runners/interactive/testing/mock_env.py:
##########
@@ -0,0 +1,90 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module of mocks to isolated the test environment for each Interactive Beam
+test.
+"""
+
+import unittest
+import uuid
+from unittest.mock import patch
+
+from apache_beam.runners.interactive import interactive_environment as ie
+from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import 
DataprocClusterManager
+from apache_beam.runners.interactive.interactive_environment import 
InteractiveEnvironment
+from apache_beam.runners.interactive.testing.mock_ipython import 
mock_get_ipython
+
+
+def isolated_env(cls: unittest.TestCase):

Review Comment:
   Pretty cool, can we make this the default for tests and instead have a 
decorator to turn this off? Isolated and idempotent unit tests should be the 
default.



##########
sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py:
##########
@@ -191,72 +175,60 @@ def cleanup(self) -> None:
             'Failed to delete cluster: %s', self.cluster_metadata.cluster_name)
         raise e
 
-  def describe(self) -> None:
-    """Returns a dictionary describing the cluster."""
-    return {
-        'cluster_metadata': self.cluster_metadata,
-        'master_url': self.master_url,
-        'dashboard': self.dashboard
-    }
-
-  def get_cluster_details(
-      self, cluster_metadata: MasterURLIdentifier) -> dataproc_v1.Cluster:
+  def get_cluster_details(self) -> dataproc_v1.Cluster:
     """Gets the Dataproc_v1 Cluster object for the current cluster manager."""
     try:
       return self._cluster_client.get_cluster(
           request={
-              'project_id': cluster_metadata.project_id,
-              'region': cluster_metadata.region,
-              'cluster_name': cluster_metadata.cluster_name
+              'project_id': self.cluster_metadata.project_id,
+              'region': self.cluster_metadata.region,
+              'cluster_name': self.cluster_metadata.cluster_name
           })
     except Exception as e:
       if e.code == 403:
         _LOGGER.error(
             'Due to insufficient project permissions, '
             'unable to retrieve information for cluster: %s',
-            cluster_metadata.cluster_name)
+            self.cluster_metadata.cluster_name)
         raise ValueError(
             'You cannot view clusters in project: {}'.format(
-                cluster_metadata.project_id))
+                self.cluster_metadata.project_id))
       elif e.code == 404:
         _LOGGER.error(
-            'Cluster does not exist: %s', cluster_metadata.cluster_name)
+            'Cluster does not exist: %s', self.cluster_metadata.cluster_name)
         raise ValueError(
-            'Cluster was not found: {}'.format(cluster_metadata.cluster_name))
+            'Cluster was not found: {}'.format(
+                self.cluster_metadata.cluster_name))
       else:
         _LOGGER.error(
             'Failed to get information for cluster: %s',
-            cluster_metadata.cluster_name)
+            self.cluster_metadata.cluster_name)
         raise e
 
-  def wait_for_cluster_to_provision(
-      self, cluster_metadata: MasterURLIdentifier) -> None:
-    while self.get_cluster_details(
-        cluster_metadata).status.state.name == 'CREATING':
+  def wait_for_cluster_to_provision(self) -> None:
+    while self.get_cluster_details().status.state.name == 'CREATING':
       time.sleep(15)

Review Comment:
   Will this always exit this loop?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 765727)
    Time Spent: 40m  (was: 0.5h)

> Improve the workflow of cluster management for Flink on Dataproc
> ----------------------------------------------------------------
>
>                 Key: BEAM-14332
>                 URL: https://issues.apache.org/jira/browse/BEAM-14332
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-py-interactive
>            Reporter: Ning
>            Assignee: Ning
>            Priority: P2
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> Improve the workflow of cluster management.
> There is an option to configure a default [cluster 
> name|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/interactive/interactive_beam.py#L366].
>  The existing user flows are:
>  # Use the default cluster name to create a new cluster if none is in use;
>  # Reuse a created cluster that has the default cluster name;
>  # If the default cluster name is configured to a new value, re-apply 1 and 2.
>  A better solution is to 
>  # Create a new cluster implicitly if there is none or explicitly if the user 
> wants one with specific provisioning;
>  # Always default to using the last created cluster.
>  The reasons are:
>  * Cluster name is meaningless to the user when a cluster is just a medium to 
> run OSS runners (as applications) such as Flink or Spark. The cluster could 
> also be running anywhere (on GCP) such as Dataproc, k8s, or even Dataflow 
> itself.
>  * Clusters should be uniquely identified, thus should always have a distinct 
> name. Clusters are managed (created/reused/deleted) behind the scenes by the 
> notebook runtime when the user doesn’t explicitly do so (the capability to 
> explicitly manage clusters is still available). Reusing the same default 
> cluster name is risky when a cluster is deleted by one notebook runtime while 
> another cluster with the same name is created by a different notebook 
> runtime. 
>  * Provide the capability for the user to explicitly provision a cluster.
> Current implementation provisions each cluster at the location specified by 
> GoogleCloudOptions using 3 worker nodes. There is no explicit API to 
> configure the number or shape of workers.
> We could use the WorkerOptions to allow customers to explicitly provision a 
> cluster and expose an explicit API (with UX in notebook extension) for 
> customers to change the size of a cluster connected with their notebook 
> (until we have an auto scaling solution with Dataproc for Flink).



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to