KevinGG commented on a change in pull request #16741:
URL: https://github.com/apache/beam/pull/16741#discussion_r808346143



##########
File path: 
sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py
##########
@@ -28,50 +31,56 @@
 
 _LOGGER = logging.getLogger(__name__)
 
+@dataclass
+class MasterURLIdentifier:
+  project_id: Optional[str] = None
+  region: Optional[str] = None
+  cluster_name: Optional[str] = None
+
+  def __key(self):
+    return (self.project_id, self.region, self.cluster_name)
+
+  def __hash__(self):
+    return hash(self.__key())
+
+  def __eq__(self, other):
+    if isinstance(other, MasterURLIdentifier):
+      return self.__key() == other.__key()
+    raise NotImplementedError('Comparisons are only supported between '
+    'instances of MasterURLIdentifier.')
+
 
 class DataprocClusterManager:
   """The DataprocClusterManager object simplifies the operations
   required for creating and deleting Dataproc clusters for use
   under Interactive Beam.
   """
-  DEFAULT_NAME = 'interactive-beam-cluster'
-
   def __init__(
       self,
-      project_id: Optional[str] = None,
-      region: Optional[str] = None,
-      cluster_name: Optional[str] = None) -> None:
+      cluster_metadata: Optional[MasterURLIdentifier] = None) -> None:

Review comment:
        Is it Optional?

##########
File path: 
sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py
##########
@@ -84,16 +93,24 @@ def create_cluster(self, cluster: dict) -> None:
     try:
       self._cluster_client.create_cluster(
           request={
-              'project_id': self._project_id,
-              'region': self._region,
+              'project_id': self.cluster_metadata.project_id,
+              'region': self.cluster_metadata.region,
               'cluster': cluster
           })
-      _LOGGER.info('Cluster created successfully: %s', self._cluster_name)
+      _LOGGER.info('Cluster created successfully: %s', 
self.cluster_metadata.cluster_name)
+      self.master_url = self.get_master_url(self.cluster_metadata, 
existing=False)
     except Exception as e:
       if e.code == 409:
-        if self._cluster_name == self.DEFAULT_NAME:
+        if self.cluster_metadata.cluster_name == 
ie.current_env().clusters.default_cluster_name:
           _LOGGER.info(
-              'Cluster %s already exists. Continuing...', self.DEFAULT_NAME)
+              'Cluster %s already exists. Continuing...',
+              ie.current_env().clusters.default_cluster_name)
+          if self.master_url:
+            _LOGGER.warning(
+              'Getting %s cluster master_url from storage...',
+              ie.current_env().clusters.default_cluster_name)

Review comment:
       This part should be getting the master_url from storage:
   
   self.master_url = self.get_master_url(...)

##########
File path: 
sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py
##########
@@ -135,30 +163,38 @@ def cleanup(self) -> None:
     try:
       self._cluster_client.delete_cluster(
           request={
-              'project_id': self._project_id,
-              'region': self._region,
-              'cluster_name': self._cluster_name,
+              'project_id': self.cluster_metadata.project_id,
+              'region': self.cluster_metadata.region,
+              'cluster_name': self.cluster_metadata.cluster_name,
           })
     except Exception as e:
       if e.code == 403:
         _LOGGER.error(
             'Due to insufficient project permissions, '
             'unable to clean up the default cluster: %s',
-            self._cluster_name)
+            self.cluster_metadata.cluster_name)
         raise ValueError(
             'You cannot delete a cluster in project: {}'.format(
-                self._project_id))
+                self.cluster_metadata.project_id))
       elif e.code == 404:
-        _LOGGER.error('Cluster does not exist: %s', self._cluster_name)
-        raise ValueError('Cluster was not found: 
{}'.format(self._cluster_name))
+        _LOGGER.error('Cluster does not exist: %s', 
self.cluster_metadata.cluster_name)
+        raise ValueError('Cluster was not found: 
{}'.format(self.cluster_metadata.cluster_name))
       else:
-        _LOGGER.error('Failed to delete cluster: %s', self._cluster_name)
+        _LOGGER.error('Failed to delete cluster: %s', 
self.cluster_metadata.cluster_name)
         raise e
 
-  def cleanup_if_default(self) -> None:
-    """Checks if the cluster_name initialized with the
-    DataprocClusterManager instance is the default
-    cluster_name. If it is, then the cleanup() method
-    is invoked."""
-    if self._cluster_name == self.DEFAULT_NAME:
-      self.cleanup()
+  def describe(self) -> None:
+    """Returns a dictionary describing the cluster."""
+    return {
+        'cluster_metadata': self.cluster_metadata,
+        'master_url': self.master_url
+    }
+
+  def get_master_url(self, identifier, existing=True) -> None:

Review comment:
       The signature should only be `get_master_url(self, identifier)`.
   
   Except the first time a job is executed, most of the time, the logic is:
   
   ```
   # The key should always exist, otherwise, you shouldn't be creating a 
cluster manager.
   master_url = 
ie.current_env().clusters.master_urls_to_identifiers.inverse[identifier]
   ```
   
   

##########
File path: 
sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py
##########
@@ -84,16 +93,24 @@ def create_cluster(self, cluster: dict) -> None:
     try:
       self._cluster_client.create_cluster(
           request={
-              'project_id': self._project_id,
-              'region': self._region,
+              'project_id': self.cluster_metadata.project_id,
+              'region': self.cluster_metadata.region,
               'cluster': cluster
           })
-      _LOGGER.info('Cluster created successfully: %s', self._cluster_name)
+      _LOGGER.info('Cluster created successfully: %s', 
self.cluster_metadata.cluster_name)
+      self.master_url = self.get_master_url(self.cluster_metadata, 
existing=False)
     except Exception as e:
       if e.code == 409:
-        if self._cluster_name == self.DEFAULT_NAME:
+        if self.cluster_metadata.cluster_name == 
ie.current_env().clusters.default_cluster_name:

Review comment:
       This check should be around whether the `cluster_metadata` exists in the 
`ie.current_env().clusters.urls_to_ids.inverse`.

##########
File path: sdks/python/apache_beam/runners/interactive/interactive_runner.py
##########
@@ -209,6 +218,62 @@ def visit_transform(self, transform_node):
 
     return main_job_result
 
+  # TODO(victorhc): Move this method somewhere else if performance is impacted
+  # by generating a cluster during runtime.
+  def _create_dataproc_cluster_if_applicable(self, user_pipeline):
+    """ Creates a Dataproc cluster if the provided user_pipeline is running
+    FlinkRunner and no flink_master_url was provided as an option. A cluster
+    is not created when a flink_master_url is detected.
+
+    Example pipeline options to enable automatic Dataproc cluster creation:
+      options = PipelineOptions([
+      '--runner=FlinkRunner',
+      '--project=my-project',
+      '--region=my-region',
+      '--environment_type=DOCKER'
+      ])
+
+    Example pipeline options to skip automatic Dataproc cluster creation:
+      options = PipelineOptions([
+      '--runner=FlinkRunner',
+      '--flink_master=example.internal:41979',
+      '--environment_type=DOCKER'
+      ])
+    """
+    from apache_beam.options.pipeline_options import FlinkRunnerOptions
+    flink_master = user_pipeline.options.view_as(
+        FlinkRunnerOptions).flink_master
+    if flink_master == '[auto]':
+      from apache_beam.runners.portability.flink_runner import FlinkRunner
+      if isinstance(self._underlying_runner, FlinkRunner):

Review comment:
       Can this be
   
   ```Python
   # Only consider this logic when both below 2 conditions apply.
   if isinstance(self._underlying_runner, 
       FlinkRunner) and clusters.dataproc_cluster_managers.get(
       str(id(user_pipeline)), None) is None:
     if flink_master == '[auto]':
       # build_metadata from options
     elif flink_master in ie.current_env().clusters.master_urls:
       # get metadata from managed master_urls
       cluster_metadata = 
ie.current_env().clusters.master_urls.get(flink_master)
     # Else noop, no need to log anything because we allow a master_url (not 
managed by us) provided by the user.
     if cluster_metadata:
       # create the cluster_manager and populate dicts in the clusters instance.
   ```

##########
File path: sdks/python/apache_beam/runners/interactive/interactive_runner.py
##########
@@ -209,6 +218,62 @@ def visit_transform(self, transform_node):
 
     return main_job_result
 
+  # TODO(victorhc): Move this method somewhere else if performance is impacted
+  # by generating a cluster during runtime.
+  def _create_dataproc_cluster_if_applicable(self, user_pipeline):
+    """ Creates a Dataproc cluster if the provided user_pipeline is running
+    FlinkRunner and no flink_master_url was provided as an option. A cluster
+    is not created when a flink_master_url is detected.
+
+    Example pipeline options to enable automatic Dataproc cluster creation:
+      options = PipelineOptions([
+      '--runner=FlinkRunner',
+      '--project=my-project',
+      '--region=my-region',
+      '--environment_type=DOCKER'
+      ])
+
+    Example pipeline options to skip automatic Dataproc cluster creation:
+      options = PipelineOptions([
+      '--runner=FlinkRunner',
+      '--flink_master=example.internal:41979',
+      '--environment_type=DOCKER'
+      ])
+    """
+    from apache_beam.options.pipeline_options import FlinkRunnerOptions
+    flink_master = user_pipeline.options.view_as(
+        FlinkRunnerOptions).flink_master
+    if flink_master == '[auto]':

Review comment:
       Please add a comment indicating that when this is True, it means the 
master url is not set.

##########
File path: sdks/python/apache_beam/runners/interactive/interactive_beam.py
##########
@@ -329,6 +332,94 @@ def record(self, pipeline):
     return recording_manager.record_pipeline()
 
 
+class Clusters:
+  """An interface for users to modify the pipelines that are being run by the
+  Interactive Environment.
+
+  Methods of the Interactive Beam Clusters class can be accessed via:
+    from apache_beam.runners.interactive import interactive_beam as ib
+    ib.clusters
+
+  Example of calling the Interactive Beam clusters describe method::
+    ib.clusters.describe()
+  """
+  def __init__(self) -> None:
+    """Instantiates default values for Dataproc cluster interactions.
+    """
+    self.default_cluster_name = 'interactive-beam-cluster'
+    self.master_urls = bidict()

Review comment:
       Add some comments about the mapping from what (including the typehints) 
to what.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to