[ 
https://issues.apache.org/jira/browse/BEAM-14332?focusedWorklogId=766298&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-766298
 ]

ASF GitHub Bot logged work on BEAM-14332:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 04/May/22 21:33
            Start Date: 04/May/22 21:33
    Worklog Time Spent: 10m 
      Work Description: KevinGG commented on code in PR #17402:
URL: https://github.com/apache/beam/pull/17402#discussion_r865403256


##########
sdks/python/apache_beam/runners/interactive/interactive_beam.py:
##########
@@ -339,151 +342,229 @@ def record(self, pipeline):
 
 
 class Clusters:
-  """An interface for users to modify the pipelines that are being run by the
-  Interactive Environment.
+  """An interface to manage clusters running workers that are connected with
+  the current interactive environment.
 
-  Methods of the Interactive Beam Clusters class can be accessed via:
-    from apache_beam.runners.interactive import interactive_beam as ib
-    ib.clusters
+  This module is experimental. No backwards-compatibility guarantees.
 
-  Example of calling the Interactive Beam clusters describe method::
-    ib.clusters.describe()
+  Interactive Beam automatically creates/reuses existing worker clusters to
+  execute pipelines when it detects the need from configurations.
+  Currently, the only supported cluster implementation is Flink running on
+  Cloud Dataproc.
+
+  To configure a pipeline to run on Cloud Dataproc with Flink, set the
+  underlying runner of the InteractiveRunner to FlinkRunner and the pipeline
+  options to indicate where on Cloud the FlinkRunner should be deployed to.
+
+    An example to enable automatic Dataproc cluster creation/reuse::
+
+      options = PipelineOptions([
+          '--project=my-project',
+          '--region=my-region',
+          '--environment_type=DOCKER'])
+      pipeline = beam.Pipeline(InteractiveRunner(
+          underlying_runner=FlinkRunner()), options=options)
+
+  Reuse a pipeline options in another pipeline would configure Interactive Beam
+  to reuse the same Dataproc cluster implicitly managed by the current
+  interactive environment.
+  If a flink_master is identified as a known cluster, the corresponding cluster
+  is also resued.
+  Furthermore, if a cluster is explicitly created by using a pipeline as an
+  identifier to a known cluster, the cluster is reused.
+
+    An example::
+
+      # dcm == ib.clusters.pipelines.get(pipeline), no cluster is newly 
created.
+      dcm = ib.clusters.create(pipeline)
+
+  To configure a pipeline to run on an existing FlinkRunner deployed elsewhere,
+  set the flink_master explicitly so no cluster will be created/reused.
+
+    An example pipeline options to skip automatic Dataproc cluster usage::
+
+      options = PipelineOptions([
+          '--flink_master=some.self.hosted.flink:port',
+          '--environment_type=DOCKER'])
+
+  To configure a pipeline to run on a local FlinkRunner, explicitly set the
+  default cluster metadata to None: ib.clusters.set_default_cluster(None).
   """
   # Explicitly set the Flink version here to ensure compatibility with 2.0
   # Dataproc images:
   # 
https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-release-2.0
   DATAPROC_FLINK_VERSION = '1.12'
+
   # TODO(BEAM-14142): Fix the Dataproc image version after a released image
   # contains all missing dependencies for Flink to run.
   # DATAPROC_IMAGE_VERSION = '2.0.XX-debian10'
-  DATAPROC_STAGING_LOG_NAME = 'dataproc-startup-script_output'
 
   def __init__(self) -> None:
-    """Instantiates default values for Dataproc cluster interactions.
-    """
-    # Set the default_cluster_name that will be used when creating Dataproc
-    # clusters.
-    self.default_cluster_name = 'interactive-beam-cluster'
-    # Bidirectional 1-1 mapping between master_urls (str) to cluster metadata
-    # (MasterURLIdentifier), where self.master_urls.inverse is a mapping from
-    # MasterURLIdentifier -> str.
-    self.master_urls = bidict()
-    # self.dataproc_cluster_managers map string pipeline ids to instances of
-    # DataprocClusterManager.
-    self.dataproc_cluster_managers = {}
-    # self.master_urls_to_pipelines map string master_urls to lists of
-    # pipelines that use the corresponding master_url.
-    self.master_urls_to_pipelines: DefaultDict[
-        str, List[beam.Pipeline]] = defaultdict(list)
-    # self.master_urls_to_dashboards map string master_urls to the
-    # corresponding Apache Flink dashboards.
-    self.master_urls_to_dashboards: Dict[str, str] = {}
-    # self.default_cluster_metadata for creating a DataprocClusterManager when
-    # a pipeline has its cluster deleted from the clusters Jupyterlab
-    # extension.
-    self.default_cluster_metadata = None
-
-  def describe(self, pipeline: Optional[beam.Pipeline] = None) -> dict:
-    """Returns a description of the cluster associated to the given pipeline.
-
-    If no pipeline is given then this returns a dictionary of descriptions for
-    all pipelines, mapped to by id.
+    self.dataproc_cluster_managers: Dict[ClusterMetadata,
+                                         DataprocClusterManager] = {}
+    self.master_urls: Dict[str, ClusterMetadata] = {}
+    self.pipelines: Dict[beam.Pipeline, DataprocClusterManager] = {}
+    self.default_cluster_metadata: Optional[ClusterMetadata] = None
+
+  def create(
+      self, cluster_identifier: ClusterIdentifier) -> DataprocClusterManager:
+    """Creates a Dataproc cluster manager provisioned for the cluster
+    identified. If the cluster is known, returns an existing cluster manager.
     """
-    description = {
-        pid: dcm.describe()
-        for pid,
-        dcm in self.dataproc_cluster_managers.items()
-    }
-    if pipeline:
-      return description.get(str(id(pipeline)), None)
-    return description
+    # Try to get some not-None cluster metadata.
+    cluster_metadata = self.cluster_metadata(cluster_identifier)
+    if not cluster_metadata:
+      raise ValueError(
+          'Unknown cluster identifier: %s. Cannot create or reuse'
+          'a Dataproc cluster.')
+    elif cluster_metadata.region == 'global':
+      # The global region is unsupported as it will be eventually deprecated.
+      raise ValueError('Clusters in the global region are not supported.')
+    elif not cluster_metadata.region:
+      _LOGGER.info(
+          'No region information was detected, defaulting Dataproc cluster '
+          'region to: us-central1.')
+      cluster_metadata.region = 'us-central1'
+    # else use the provided region.
+    known_dcm = self.dataproc_cluster_managers.get(cluster_metadata, None)
+    if known_dcm:
+      return known_dcm
+    dcm = DataprocClusterManager(cluster_metadata)
+    dcm.create_flink_cluster()
+    # ClusterMetadata with derivative fields populated by the dcm.
+    derived_meta = dcm.cluster_metadata
+    self.dataproc_cluster_managers[derived_meta] = dcm
+    self.master_urls[derived_meta.master_url] = derived_meta
+    # Update the default cluster metadata to the one just created.
+    self.set_default_cluster(derived_meta)
+    return dcm
 
   def cleanup(
-      self, pipeline: Optional[beam.Pipeline] = None, force=False) -> None:
-    """Attempt to cleanup the Dataproc Cluster corresponding to the given
-    pipeline.
-
-    If the cluster is not managed by interactive_beam, a corresponding cluster
-    manager is not detected, and deletion is skipped.
-
-    For clusters managed by Interactive Beam, by default, deletion is skipped
-    if any other pipelines are using the cluster.
-
-    Optionally, the cleanup for a cluster managed by Interactive Beam can be
-    forced, by setting the 'force' parameter to True.
-
-    Example usage of default cleanup::
-      interactive_beam.clusters.cleanup(p)
-
-    Example usage of forceful cleanup::
-      interactive_beam.clusters.cleanup(p, force=True)
+      self, cluster_identifier: Optional[ClusterIdentifier] = None) -> None:
+    """Cleans up the cluster associated with the given cluster_identifier.
+
+    If None cluster_identifier is provided, cleans up for all clusters.
+    If a beam.Pipeline is given as the ClusterIdentifier while multiple
+    pipelines share the same cluster, it only cleans up the association between
+    the pipeline and the cluster identified.
+    If the cluster_identifier is unknown, NOOP.
     """
-    if pipeline:
-      cluster_manager = self.dataproc_cluster_managers.get(
-          str(id(pipeline)), None)
-      if cluster_manager:
-        master_url = cluster_manager.master_url
-        if len(self.master_urls_to_pipelines[master_url]) > 1:
-          if force:
-            _LOGGER.warning(
-                'Cluster is currently being used by another cluster, but '
-                'will be forcefully cleaned up.')
-            cluster_manager.cleanup()
-          else:
-            _LOGGER.warning(
-                'Cluster is currently being used, skipping deletion.')
-          self.master_urls_to_pipelines[master_url].remove(str(id(pipeline)))
-        else:
-          cluster_manager.cleanup()
-          self.master_urls.pop(master_url, None)
-          self.master_urls_to_pipelines.pop(master_url, None)
-          self.master_urls_to_dashboards.pop(master_url, None)
-          self.dataproc_cluster_managers.pop(str(id(pipeline)), None)
+    if not cluster_identifier:  # Cleans up everything.
+      for dcm in set(self.dataproc_cluster_managers.values()):
+        self._cleanup(dcm)
+      self.default_cluster_metadata = None

Review Comment:
   Let me keep it simple and use the force argument only without a prompt.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 766298)
    Time Spent: 1h 10m  (was: 1h)

> Improve the workflow of cluster management for Flink on Dataproc
> ----------------------------------------------------------------
>
>                 Key: BEAM-14332
>                 URL: https://issues.apache.org/jira/browse/BEAM-14332
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-py-interactive
>            Reporter: Ning
>            Assignee: Ning
>            Priority: P2
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> Improve the workflow of cluster management.
> There is an option to configure a default [cluster 
> name|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/interactive/interactive_beam.py#L366].
>  The existing user flows are:
>  # Use the default cluster name to create a new cluster if none is in use;
>  # Reuse a created cluster that has the default cluster name;
>  # If the default cluster name is configured to a new value, re-apply 1 and 2.
>  A better solution is to 
>  # Create a new cluster implicitly if there is none or explicitly if the user 
> wants one with specific provisioning;
>  # Always default to using the last created cluster.
>  The reasons are:
>  * Cluster name is meaningless to the user when a cluster is just a medium to 
> run OSS runners (as applications) such as Flink or Spark. The cluster could 
> also be running anywhere (on GCP) such as Dataproc, k8s, or even Dataflow 
> itself.
>  * Clusters should be uniquely identified, thus should always have a distinct 
> name. Clusters are managed (created/reused/deleted) behind the scenes by the 
> notebook runtime when the user doesn’t explicitly do so (the capability to 
> explicitly manage clusters is still available). Reusing the same default 
> cluster name is risky when a cluster is deleted by one notebook runtime while 
> another cluster with the same name is created by a different notebook 
> runtime. 
>  * Provide the capability for the user to explicitly provision a cluster.
> Current implementation provisions each cluster at the location specified by 
> GoogleCloudOptions using 3 worker nodes. There is no explicit API to 
> configure the number or shape of workers.
> We could use the WorkerOptions to allow customers to explicitly provision a 
> cluster and expose an explicit API (with UX in notebook extension) for 
> customers to change the size of a cluster connected with their notebook 
> (until we have an auto scaling solution with Dataproc for Flink).



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to