KevinGG commented on a change in pull request #16741:
URL: https://github.com/apache/beam/pull/16741#discussion_r807221613
##########
File path:
sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py
##########
@@ -91,9 +97,10 @@ def create_cluster(self, cluster: dict) -> None:
_LOGGER.info('Cluster created successfully: %s', self._cluster_name)
except Exception as e:
if e.code == 409:
- if self._cluster_name == self.DEFAULT_NAME:
+ if self._cluster_name ==
ie.current_env().clusters.default_cluster_name:
Review comment:
ie.current_env().clusters.url_to_ids.inverse.get(self.cluster_metadata,
None)
if None:
ValueError
else:
warning
get_master_url(from stored mapping)
##########
File path:
sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py
##########
@@ -91,9 +97,10 @@ def create_cluster(self, cluster: dict) -> None:
_LOGGER.info('Cluster created successfully: %s', self._cluster_name)
except Exception as e:
if e.code == 409:
- if self._cluster_name == self.DEFAULT_NAME:
+ if self._cluster_name ==
ie.current_env().clusters.default_cluster_name:
_LOGGER.info(
- 'Cluster %s already exists. Continuing...', self.DEFAULT_NAME)
+ 'Cluster %s already exists. Continuing...',
+ ie.current_env().clusters.default_cluster_name)
Review comment:
self.cluster_metadata.cluster_name
##########
File path: sdks/python/apache_beam/runners/interactive/interactive_beam.py
##########
@@ -329,6 +359,101 @@ def record(self, pipeline):
return recording_manager.record_pipeline()
+class Clusters():
+ """An interface for users to modify the pipelines that are being run by the
+ Interactive Environment.
+
+ Methods of the Interactive Beam Clusters class can be accessed via:
+ interactive_beam.clusters
+
+ Example of calling the Interactive Beam clusters describe method::
+ interactive_beam.clusters.describe()
+ """
+ def __init__(self) -> None:
+ """Instantiates default values for Dataproc cluster interactions.
+ """
+ self._default_cluster_name = 'interactive-beam-cluster'
+ self._master_urls = bidict()
+ self._dataproc_cluster_managers = {}
+ self._master_urls_to_pipelines = {}
+
+ def describe(self, pipeline: Optional[beam.Pipeline]=None) -> dict:
+ """Returns a description of the cluster associated to the given pipeline.
+
+ If no pipeline is given then this returns a dictionary of descriptions for
+ all pipelines.
+ """
+
+ description = ie.current_env().describe_all_clusters()
+ if pipeline:
+ return description.get(pipeline, None)
+ return description
+
+ @property
+ def default_cluster_name(self) -> str:
+ """The default name to be used when creating Dataproc clusters.
+
+ Defaults to 'interactive-beam-cluster'.
+ """
+ return self._default_cluster_name
+
+ @default_cluster_name.setter
+ def default_cluster_name(self, value: bool) -> None:
+ """Sets the default name to be used when creating Dataproc clusters.
+
+ Defaults to 'interactive-beam-cluster'.
+
+ Example of assigning a default_cluster_name::
+ interactive_beam.clusters.default_cluster_name = 'my-beam-cluster'
+ """
+ self._default_cluster_name = value
+
+ def cleanup(self, pipeline: beam.Pipeline, forcefully=False) -> None:
+ """Attempt to cleanup the Dataproc Cluster corresponding to the given
pipeline.
+
+ If the cluster is not managed by interactive_beam, a corresponding cluster
+ manager is not detected, and deletion is skipped.
+
+ For clusters managed by Interactive Beam, by default, deletion is skipped
+ if any other pipelines are using the cluster.
+
+ Optionally, the cleanup for a cluster managed by Interactive Beam can be
+ forced, by setting the 'forcefully' parameter to True.
+
+ Example usage of default cleanup::
+ interactive_beam.clusters.cleanup(p)
+
+ Example usage of forceful cleanup::
+ interactive_beam.clusters.cleanup(p, forcefully=True)
+ """
+ cluster_manager = ie.current_env().get_dataproc_cluster_manager(pipeline)
+ if cluster_manager:
+ master_url = cluster_manager.master_url
+ if len(self.get_pipelines_using_master_url(master_url)) > 1:
+ if forcefully:
+ _LOGGER.warning(
+ 'Cluster is currently being used by another cluster, but '
+ 'will be forcefully cleaned up.')
+ cluster_manager.cleanup()
+ self._master_urls_to_pipelines[master_url].remove(str(id(pipeline)))
+ else:
+ _LOGGER.warning(
+ 'Cluster is currently being used, skipping deletion.')
+ else:
+ cluster_manager.cleanup()
+ del self._master_urls[master_url]
+ del self._master_urls_to_pipelines[master_url]
Review comment:
See if pop is a better option
##########
File path: sdks/python/apache_beam/runners/interactive/interactive_beam.py
##########
@@ -329,6 +359,101 @@ def record(self, pipeline):
return recording_manager.record_pipeline()
+class Clusters():
+ """An interface for users to modify the pipelines that are being run by the
+ Interactive Environment.
+
+ Methods of the Interactive Beam Clusters class can be accessed via:
Review comment:
from apache_beam.runners.interactive import interactive_beam as ib
##########
File path:
sdks/python/apache_beam/runners/interactive/interactive_environment.py
##########
@@ -226,6 +232,16 @@ def options(self):
from apache_beam.runners.interactive.interactive_beam import options
return options
+ @property
Review comment:
Make it public and skip the @property
##########
File path:
sdks/python/apache_beam/runners/interactive/interactive_environment.py
##########
@@ -258,31 +274,53 @@ def inspector_with_synthetic(self):
synthetic variables generated by Interactive Beam. Internally used."""
return self._inspector_with_synthetic
+ def cleanup_pipeline(self, pipeline):
+ from apache_beam.runners.interactive import background_caching_job as bcj
+ bcj.attempt_to_cancel_background_caching_job(pipeline)
+ bcj.attempt_to_stop_test_stream_service(pipeline)
+ cache_manager = self.get_cache_manager(pipeline)
+ # Recording manager performs cache manager cleanup during eviction, so we
+ # don't need to clean it up here.
+ if cache_manager and self.get_recording_manager(pipeline) is None:
+ cache_manager.cleanup()
+ if self.options.cleanup_cluster:
+ cluster_manager = self.get_dataproc_cluster_manager(pipeline)
+ if cluster_manager:
Review comment:
Just call self.clusters.cleanup(pipeline)
##########
File path: sdks/python/apache_beam/runners/interactive/interactive_runner.py
##########
@@ -209,6 +214,50 @@ def visit_transform(self, transform_node):
return main_job_result
+ # TODO(victorhc): Move this method somewhere else if performance is impacted
+ # by generating a cluster during runtime.
+ def _create_dataproc_cluster_if_applicable(self, user_pipeline):
+ """ Creates a Dataproc cluster if the provided user_pipeline is running
+ FlinkRunner and no flink_master_url was provided as an option. A cluster
+ is not created when a flink_master_url is detected.
Review comment:
if ie.current_env().clusters.master_urls.get(master_url): ->
cluster_metadata to create a cluster manager
##########
File path: sdks/python/apache_beam/runners/interactive/interactive_beam.py
##########
@@ -329,6 +359,101 @@ def record(self, pipeline):
return recording_manager.record_pipeline()
+class Clusters():
+ """An interface for users to modify the pipelines that are being run by the
+ Interactive Environment.
+
+ Methods of the Interactive Beam Clusters class can be accessed via:
+ interactive_beam.clusters
+
+ Example of calling the Interactive Beam clusters describe method::
+ interactive_beam.clusters.describe()
+ """
+ def __init__(self) -> None:
+ """Instantiates default values for Dataproc cluster interactions.
+ """
+ self._default_cluster_name = 'interactive-beam-cluster'
+ self._master_urls = bidict()
+ self._dataproc_cluster_managers = {}
+ self._master_urls_to_pipelines = {}
+
+ def describe(self, pipeline: Optional[beam.Pipeline]=None) -> dict:
+ """Returns a description of the cluster associated to the given pipeline.
+
+ If no pipeline is given then this returns a dictionary of descriptions for
+ all pipelines.
+ """
+
+ description = ie.current_env().describe_all_clusters()
+ if pipeline:
+ return description.get(pipeline, None)
+ return description
+
+ @property
+ def default_cluster_name(self) -> str:
+ """The default name to be used when creating Dataproc clusters.
+
+ Defaults to 'interactive-beam-cluster'.
+ """
+ return self._default_cluster_name
+
+ @default_cluster_name.setter
+ def default_cluster_name(self, value: bool) -> None:
+ """Sets the default name to be used when creating Dataproc clusters.
+
+ Defaults to 'interactive-beam-cluster'.
+
+ Example of assigning a default_cluster_name::
+ interactive_beam.clusters.default_cluster_name = 'my-beam-cluster'
+ """
+ self._default_cluster_name = value
+
+ def cleanup(self, pipeline: beam.Pipeline, forcefully=False) -> None:
+ """Attempt to cleanup the Dataproc Cluster corresponding to the given
pipeline.
+
+ If the cluster is not managed by interactive_beam, a corresponding cluster
+ manager is not detected, and deletion is skipped.
+
+ For clusters managed by Interactive Beam, by default, deletion is skipped
+ if any other pipelines are using the cluster.
+
+ Optionally, the cleanup for a cluster managed by Interactive Beam can be
+ forced, by setting the 'forcefully' parameter to True.
+
+ Example usage of default cleanup::
+ interactive_beam.clusters.cleanup(p)
+
+ Example usage of forceful cleanup::
+ interactive_beam.clusters.cleanup(p, forcefully=True)
+ """
+ cluster_manager = ie.current_env().get_dataproc_cluster_manager(pipeline)
+ if cluster_manager:
+ master_url = cluster_manager.master_url
+ if len(self.get_pipelines_using_master_url(master_url)) > 1:
+ if forcefully:
+ _LOGGER.warning(
+ 'Cluster is currently being used by another cluster, but '
+ 'will be forcefully cleaned up.')
+ cluster_manager.cleanup()
+ self._master_urls_to_pipelines[master_url].remove(str(id(pipeline)))
+ else:
+ _LOGGER.warning(
+ 'Cluster is currently being used, skipping deletion.')
+ else:
+ cluster_manager.cleanup()
+ del self._master_urls[master_url]
+ del self._master_urls_to_pipelines[master_url]
+ else:
+ _LOGGER.error(
+ 'No cluster_manager is associated with the provided '
+ 'pipeline!')
+
+ def get_pipelines_using_master_url(self, master_url: str) -> None:
Review comment:
Ditto, make attributes public and use them.
##########
File path:
sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py
##########
@@ -65,13 +66,18 @@ def __init__(
cluster_name)
self._cluster_name = cluster_name
else:
- self._cluster_name = self.DEFAULT_NAME
+ self._cluster_name = ie.current_env().clusters.default_cluster_name
self._cluster_client = dataproc_v1.ClusterControllerClient(
client_options={
'api_endpoint': f'{self._region}-dataproc.googleapis.com:443'
})
+ self.master_url_identifier = MasterURLIdentifier(
Review comment:
Maybe rename it to cluster_metadata and replace the existing attributes
with this parent attribute.
##########
File path:
sdks/python/apache_beam/runners/interactive/interactive_environment.py
##########
@@ -258,31 +274,53 @@ def inspector_with_synthetic(self):
synthetic variables generated by Interactive Beam. Internally used."""
return self._inspector_with_synthetic
+ def cleanup_pipeline(self, pipeline):
+ from apache_beam.runners.interactive import background_caching_job as bcj
+ bcj.attempt_to_cancel_background_caching_job(pipeline)
+ bcj.attempt_to_stop_test_stream_service(pipeline)
+ cache_manager = self.get_cache_manager(pipeline)
+ # Recording manager performs cache manager cleanup during eviction, so we
+ # don't need to clean it up here.
+ if cache_manager and self.get_recording_manager(pipeline) is None:
+ cache_manager.cleanup()
+ if self.options.cleanup_cluster:
Review comment:
Always clean up
##########
File path:
sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py
##########
@@ -17,9 +17,12 @@
# pytype: skip-file
+import dataclasses
Review comment:
from dataclasses import dataclass
##########
File path: sdks/python/apache_beam/runners/interactive/interactive_beam.py
##########
@@ -250,6 +252,34 @@ def cache_root(self, value):
'must be re-executed. ')
self._cache_root = value
+ @property
Review comment:
Let's remove these and always clean up automatically if there is a
cluster manager.
##########
File path: sdks/python/apache_beam/runners/interactive/interactive_beam.py
##########
@@ -329,6 +359,101 @@ def record(self, pipeline):
return recording_manager.record_pipeline()
+class Clusters():
+ """An interface for users to modify the pipelines that are being run by the
+ Interactive Environment.
+
+ Methods of the Interactive Beam Clusters class can be accessed via:
+ interactive_beam.clusters
+
+ Example of calling the Interactive Beam clusters describe method::
+ interactive_beam.clusters.describe()
+ """
+ def __init__(self) -> None:
+ """Instantiates default values for Dataproc cluster interactions.
+ """
+ self._default_cluster_name = 'interactive-beam-cluster'
+ self._master_urls = bidict()
+ self._dataproc_cluster_managers = {}
+ self._master_urls_to_pipelines = {}
+
+ def describe(self, pipeline: Optional[beam.Pipeline]=None) -> dict:
+ """Returns a description of the cluster associated to the given pipeline.
+
+ If no pipeline is given then this returns a dictionary of descriptions for
+ all pipelines.
+ """
+
+ description = ie.current_env().describe_all_clusters()
+ if pipeline:
+ return description.get(pipeline, None)
+ return description
+
+ @property
+ def default_cluster_name(self) -> str:
+ """The default name to be used when creating Dataproc clusters.
+
+ Defaults to 'interactive-beam-cluster'.
+ """
+ return self._default_cluster_name
+
+ @default_cluster_name.setter
+ def default_cluster_name(self, value: bool) -> None:
+ """Sets the default name to be used when creating Dataproc clusters.
+
+ Defaults to 'interactive-beam-cluster'.
+
+ Example of assigning a default_cluster_name::
+ interactive_beam.clusters.default_cluster_name = 'my-beam-cluster'
+ """
+ self._default_cluster_name = value
+
+ def cleanup(self, pipeline: beam.Pipeline, forcefully=False) -> None:
+ """Attempt to cleanup the Dataproc Cluster corresponding to the given
pipeline.
+
+ If the cluster is not managed by interactive_beam, a corresponding cluster
+ manager is not detected, and deletion is skipped.
+
+ For clusters managed by Interactive Beam, by default, deletion is skipped
+ if any other pipelines are using the cluster.
+
+ Optionally, the cleanup for a cluster managed by Interactive Beam can be
+ forced, by setting the 'forcefully' parameter to True.
+
+ Example usage of default cleanup::
+ interactive_beam.clusters.cleanup(p)
+
+ Example usage of forceful cleanup::
+ interactive_beam.clusters.cleanup(p, forcefully=True)
+ """
+ cluster_manager = ie.current_env().get_dataproc_cluster_manager(pipeline)
+ if cluster_manager:
+ master_url = cluster_manager.master_url
+ if len(self.get_pipelines_using_master_url(master_url)) > 1:
+ if forcefully:
+ _LOGGER.warning(
+ 'Cluster is currently being used by another cluster, but '
+ 'will be forcefully cleaned up.')
+ cluster_manager.cleanup()
+ self._master_urls_to_pipelines[master_url].remove(str(id(pipeline)))
Review comment:
Should always be executed whether forcefully or not.
##########
File path: sdks/python/apache_beam/runners/interactive/interactive_beam.py
##########
@@ -329,6 +359,101 @@ def record(self, pipeline):
return recording_manager.record_pipeline()
+class Clusters():
+ """An interface for users to modify the pipelines that are being run by the
+ Interactive Environment.
+
+ Methods of the Interactive Beam Clusters class can be accessed via:
+ interactive_beam.clusters
+
+ Example of calling the Interactive Beam clusters describe method::
+ interactive_beam.clusters.describe()
+ """
+ def __init__(self) -> None:
+ """Instantiates default values for Dataproc cluster interactions.
+ """
+ self._default_cluster_name = 'interactive-beam-cluster'
+ self._master_urls = bidict()
+ self._dataproc_cluster_managers = {}
+ self._master_urls_to_pipelines = {}
+
+ def describe(self, pipeline: Optional[beam.Pipeline]=None) -> dict:
+ """Returns a description of the cluster associated to the given pipeline.
+
+ If no pipeline is given then this returns a dictionary of descriptions for
+ all pipelines.
+ """
+
+ description = ie.current_env().describe_all_clusters()
+ if pipeline:
+ return description.get(pipeline, None)
+ return description
+
+ @property
+ def default_cluster_name(self) -> str:
+ """The default name to be used when creating Dataproc clusters.
+
+ Defaults to 'interactive-beam-cluster'.
+ """
+ return self._default_cluster_name
+
+ @default_cluster_name.setter
+ def default_cluster_name(self, value: bool) -> None:
+ """Sets the default name to be used when creating Dataproc clusters.
+
+ Defaults to 'interactive-beam-cluster'.
+
+ Example of assigning a default_cluster_name::
+ interactive_beam.clusters.default_cluster_name = 'my-beam-cluster'
+ """
+ self._default_cluster_name = value
+
+ def cleanup(self, pipeline: beam.Pipeline, forcefully=False) -> None:
Review comment:
Optional[...]
##########
File path:
sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py
##########
@@ -124,10 +131,23 @@ def create_flink_cluster(self) -> None:
'config': {
'software_config': {
'optional_components': ['DOCKER', 'FLINK']
+ },
+ 'gce_cluster_config': {
+ 'metadata': {
+ 'flink-start-yarn-session': 'true'
+ },
+ 'service_account_scopes': [
+ 'https://www.googleapis.com/auth/cloud-platform'
+ ]
+ },
+ 'endpoint_config': {
+ 'enable_http_port_access': True
}
}
}
self.create_cluster(cluster)
+ self.master_url = self.get_master_url(
Review comment:
populate this inside create_cluster for:
- successfully created a cluster
- 409 but the cluster is managed by us
##########
File path: sdks/python/apache_beam/runners/interactive/interactive_beam.py
##########
@@ -329,6 +359,101 @@ def record(self, pipeline):
return recording_manager.record_pipeline()
+class Clusters():
+ """An interface for users to modify the pipelines that are being run by the
+ Interactive Environment.
+
+ Methods of the Interactive Beam Clusters class can be accessed via:
+ interactive_beam.clusters
+
+ Example of calling the Interactive Beam clusters describe method::
+ interactive_beam.clusters.describe()
+ """
+ def __init__(self) -> None:
+ """Instantiates default values for Dataproc cluster interactions.
+ """
+ self._default_cluster_name = 'interactive-beam-cluster'
+ self._master_urls = bidict()
+ self._dataproc_cluster_managers = {}
+ self._master_urls_to_pipelines = {}
+
+ def describe(self, pipeline: Optional[beam.Pipeline]=None) -> dict:
+ """Returns a description of the cluster associated to the given pipeline.
+
+ If no pipeline is given then this returns a dictionary of descriptions for
+ all pipelines.
+ """
+
+ description = ie.current_env().describe_all_clusters()
Review comment:
Move logic from ie to this class itself.
##########
File path: sdks/python/apache_beam/runners/interactive/interactive_beam.py
##########
@@ -329,6 +359,101 @@ def record(self, pipeline):
return recording_manager.record_pipeline()
+class Clusters():
+ """An interface for users to modify the pipelines that are being run by the
+ Interactive Environment.
+
+ Methods of the Interactive Beam Clusters class can be accessed via:
+ interactive_beam.clusters
+
+ Example of calling the Interactive Beam clusters describe method::
+ interactive_beam.clusters.describe()
+ """
+ def __init__(self) -> None:
+ """Instantiates default values for Dataproc cluster interactions.
+ """
+ self._default_cluster_name = 'interactive-beam-cluster'
Review comment:
Make all attributes public.
##########
File path: sdks/python/apache_beam/runners/interactive/interactive_beam.py
##########
@@ -329,6 +359,101 @@ def record(self, pipeline):
return recording_manager.record_pipeline()
+class Clusters():
+ """An interface for users to modify the pipelines that are being run by the
+ Interactive Environment.
+
+ Methods of the Interactive Beam Clusters class can be accessed via:
+ interactive_beam.clusters
+
+ Example of calling the Interactive Beam clusters describe method::
+ interactive_beam.clusters.describe()
+ """
+ def __init__(self) -> None:
+ """Instantiates default values for Dataproc cluster interactions.
+ """
+ self._default_cluster_name = 'interactive-beam-cluster'
+ self._master_urls = bidict()
+ self._dataproc_cluster_managers = {}
+ self._master_urls_to_pipelines = {}
+
+ def describe(self, pipeline: Optional[beam.Pipeline]=None) -> dict:
+ """Returns a description of the cluster associated to the given pipeline.
+
+ If no pipeline is given then this returns a dictionary of descriptions for
+ all pipelines.
+ """
+
+ description = ie.current_env().describe_all_clusters()
+ if pipeline:
+ return description.get(pipeline, None)
+ return description
+
+ @property
+ def default_cluster_name(self) -> str:
+ """The default name to be used when creating Dataproc clusters.
+
+ Defaults to 'interactive-beam-cluster'.
+ """
+ return self._default_cluster_name
+
+ @default_cluster_name.setter
+ def default_cluster_name(self, value: bool) -> None:
+ """Sets the default name to be used when creating Dataproc clusters.
+
+ Defaults to 'interactive-beam-cluster'.
+
+ Example of assigning a default_cluster_name::
+ interactive_beam.clusters.default_cluster_name = 'my-beam-cluster'
+ """
+ self._default_cluster_name = value
+
+ def cleanup(self, pipeline: beam.Pipeline, forcefully=False) -> None:
+ """Attempt to cleanup the Dataproc Cluster corresponding to the given
pipeline.
+
+ If the cluster is not managed by interactive_beam, a corresponding cluster
+ manager is not detected, and deletion is skipped.
+
+ For clusters managed by Interactive Beam, by default, deletion is skipped
+ if any other pipelines are using the cluster.
+
+ Optionally, the cleanup for a cluster managed by Interactive Beam can be
+ forced, by setting the 'forcefully' parameter to True.
+
+ Example usage of default cleanup::
+ interactive_beam.clusters.cleanup(p)
+
+ Example usage of forceful cleanup::
+ interactive_beam.clusters.cleanup(p, forcefully=True)
+ """
+ cluster_manager = ie.current_env().get_dataproc_cluster_manager(pipeline)
Review comment:
self._dataproc_cluster_managers.get(id(pipeline), None)
##########
File path:
sdks/python/apache_beam/runners/interactive/interactive_environment.py
##########
@@ -258,31 +274,53 @@ def inspector_with_synthetic(self):
synthetic variables generated by Interactive Beam. Internally used."""
return self._inspector_with_synthetic
+ def cleanup_pipeline(self, pipeline):
+ from apache_beam.runners.interactive import background_caching_job as bcj
+ bcj.attempt_to_cancel_background_caching_job(pipeline)
+ bcj.attempt_to_stop_test_stream_service(pipeline)
+ cache_manager = self.get_cache_manager(pipeline)
+ # Recording manager performs cache manager cleanup during eviction, so we
+ # don't need to clean it up here.
+ if cache_manager and self.get_recording_manager(pipeline) is None:
+ cache_manager.cleanup()
+ if self.options.cleanup_cluster:
+ cluster_manager = self.get_dataproc_cluster_manager(pipeline)
+ if cluster_manager:
Review comment:
Just call self.clusters.cleanup(pipeline)
##########
File path: sdks/python/apache_beam/runners/interactive/interactive_beam.py
##########
@@ -329,6 +359,101 @@ def record(self, pipeline):
return recording_manager.record_pipeline()
+class Clusters():
+ """An interface for users to modify the pipelines that are being run by the
+ Interactive Environment.
+
+ Methods of the Interactive Beam Clusters class can be accessed via:
+ interactive_beam.clusters
+
+ Example of calling the Interactive Beam clusters describe method::
+ interactive_beam.clusters.describe()
+ """
+ def __init__(self) -> None:
+ """Instantiates default values for Dataproc cluster interactions.
+ """
+ self._default_cluster_name = 'interactive-beam-cluster'
+ self._master_urls = bidict()
+ self._dataproc_cluster_managers = {}
+ self._master_urls_to_pipelines = {}
+
+ def describe(self, pipeline: Optional[beam.Pipeline]=None) -> dict:
+ """Returns a description of the cluster associated to the given pipeline.
+
+ If no pipeline is given then this returns a dictionary of descriptions for
+ all pipelines.
+ """
+
+ description = ie.current_env().describe_all_clusters()
+ if pipeline:
+ return description.get(pipeline, None)
+ return description
+
+ @property
+ def default_cluster_name(self) -> str:
+ """The default name to be used when creating Dataproc clusters.
+
+ Defaults to 'interactive-beam-cluster'.
+ """
+ return self._default_cluster_name
+
+ @default_cluster_name.setter
+ def default_cluster_name(self, value: bool) -> None:
+ """Sets the default name to be used when creating Dataproc clusters.
+
+ Defaults to 'interactive-beam-cluster'.
+
+ Example of assigning a default_cluster_name::
+ interactive_beam.clusters.default_cluster_name = 'my-beam-cluster'
+ """
+ self._default_cluster_name = value
+
+ def cleanup(self, pipeline: beam.Pipeline, forcefully=False) -> None:
+ """Attempt to cleanup the Dataproc Cluster corresponding to the given
pipeline.
+
+ If the cluster is not managed by interactive_beam, a corresponding cluster
+ manager is not detected, and deletion is skipped.
+
+ For clusters managed by Interactive Beam, by default, deletion is skipped
+ if any other pipelines are using the cluster.
+
+ Optionally, the cleanup for a cluster managed by Interactive Beam can be
+ forced, by setting the 'forcefully' parameter to True.
+
+ Example usage of default cleanup::
+ interactive_beam.clusters.cleanup(p)
+
+ Example usage of forceful cleanup::
+ interactive_beam.clusters.cleanup(p, forcefully=True)
+ """
+ cluster_manager = ie.current_env().get_dataproc_cluster_manager(pipeline)
+ if cluster_manager:
+ master_url = cluster_manager.master_url
+ if len(self.get_pipelines_using_master_url(master_url)) > 1:
+ if forcefully:
+ _LOGGER.warning(
+ 'Cluster is currently being used by another cluster, but '
+ 'will be forcefully cleaned up.')
+ cluster_manager.cleanup()
+ self._master_urls_to_pipelines[master_url].remove(str(id(pipeline)))
+ else:
+ _LOGGER.warning(
+ 'Cluster is currently being used, skipping deletion.')
+ else:
+ cluster_manager.cleanup()
+ del self._master_urls[master_url]
+ del self._master_urls_to_pipelines[master_url]
+ else:
+ _LOGGER.error(
+ 'No cluster_manager is associated with the provided '
+ 'pipeline!')
+
Review comment:
Add a cleanup everything logic here.
##########
File path: sdks/python/apache_beam/runners/interactive/interactive_runner.py
##########
@@ -209,6 +214,50 @@ def visit_transform(self, transform_node):
return main_job_result
+ # TODO(victorhc): Move this method somewhere else if performance is impacted
+ # by generating a cluster during runtime.
+ def _create_dataproc_cluster_if_applicable(self, user_pipeline):
+ """ Creates a Dataproc cluster if the provided user_pipeline is running
+ FlinkRunner and no flink_master_url was provided as an option. A cluster
+ is not created when a flink_master_url is detected.
+
+ Example pipeline options to enable automatic Dataproc cluster creation:
+ options = PipelineOptions([
+ '--runner=FlinkRunner',
+ '--project=my-project',
+ '--region=my-region',
+ '--environment_type=DOCKER'
+ ])
+
+ Example pipeline options to skip automatic Dataproc cluster creation:
+ options = PipelineOptions([
+ '--runner=FlinkRunner',
+ '--flink_master=example.internal:41979',
+ '--environment_type=DOCKER'
+ ])
+ """
+ from apache_beam.options.pipeline_options import FlinkRunnerOptions
+ flink_master = user_pipeline.options.view_as(
+ FlinkRunnerOptions).flink_master
+ if flink_master != '[auto]':
+ _LOGGER.info(
+ 'Skipping Dataproc cluster creation as a flink_master_url '
+ 'was detected.')
+ else:
+ from apache_beam.runners.portability.flink_runner import FlinkRunner
+ if isinstance(self._underlying_runner, FlinkRunner):
+ if not ie.current_env().get_dataproc_cluster_manager(user_pipeline):
+ ie.current_env().set_dataproc_cluster_manager(user_pipeline)
+ cluster_manager = ie.current_env().get_dataproc_cluster_manager(
+ user_pipeline)
+ cluster_manager.create_flink_cluster()
+ master_url = cluster_manager.master_url
+ master_url_identifier = cluster_manager.master_url_identifier
+
ie.current_env().clusters._master_urls_to_pipelines.setdefault(master_url, []).
\
+ append(str(id(user_pipeline)))
+ ie.current_env().clusters._master_urls[master_url] = \
+ master_url_identifier
Review comment:
Just do the if when we should manage a cluster for the user.
##########
File path: sdks/python/apache_beam/runners/interactive/interactive_runner.py
##########
@@ -209,6 +214,50 @@ def visit_transform(self, transform_node):
return main_job_result
+ # TODO(victorhc): Move this method somewhere else if performance is impacted
+ # by generating a cluster during runtime.
+ def _create_dataproc_cluster_if_applicable(self, user_pipeline):
+ """ Creates a Dataproc cluster if the provided user_pipeline is running
+ FlinkRunner and no flink_master_url was provided as an option. A cluster
+ is not created when a flink_master_url is detected.
+
+ Example pipeline options to enable automatic Dataproc cluster creation:
+ options = PipelineOptions([
+ '--runner=FlinkRunner',
+ '--project=my-project',
+ '--region=my-region',
+ '--environment_type=DOCKER'
+ ])
+
+ Example pipeline options to skip automatic Dataproc cluster creation:
+ options = PipelineOptions([
+ '--runner=FlinkRunner',
+ '--flink_master=example.internal:41979',
+ '--environment_type=DOCKER'
+ ])
+ """
+ from apache_beam.options.pipeline_options import FlinkRunnerOptions
+ flink_master = user_pipeline.options.view_as(
+ FlinkRunnerOptions).flink_master
+ if flink_master != '[auto]':
+ _LOGGER.info(
+ 'Skipping Dataproc cluster creation as a flink_master_url '
+ 'was detected.')
+ else:
+ from apache_beam.runners.portability.flink_runner import FlinkRunner
+ if isinstance(self._underlying_runner, FlinkRunner):
+ if not ie.current_env().get_dataproc_cluster_manager(user_pipeline):
+ ie.current_env().set_dataproc_cluster_manager(user_pipeline)
Review comment:
Move the logic from ie to here to create a cluster manager, then create
flink cluster iff no cluster manager found for this pipeline.
##########
File path:
sdks/python/apache_beam/runners/interactive/interactive_environment.py
##########
@@ -258,31 +274,53 @@ def inspector_with_synthetic(self):
synthetic variables generated by Interactive Beam. Internally used."""
return self._inspector_with_synthetic
+ def cleanup_pipeline(self, pipeline):
+ from apache_beam.runners.interactive import background_caching_job as bcj
+ bcj.attempt_to_cancel_background_caching_job(pipeline)
+ bcj.attempt_to_stop_test_stream_service(pipeline)
+ cache_manager = self.get_cache_manager(pipeline)
+ # Recording manager performs cache manager cleanup during eviction, so we
+ # don't need to clean it up here.
+ if cache_manager and self.get_recording_manager(pipeline) is None:
+ cache_manager.cleanup()
+ if self.options.cleanup_cluster:
+ cluster_manager = self.get_dataproc_cluster_manager(pipeline)
+ if cluster_manager:
+ master_url = cluster_manager.master_url
+ if len(self.clusters.get_pipelines_using_master_url(master_url)) > 1:
+ _LOGGER.warning(
+ 'Cluster is currently being used, skipping deletion.')
+ else:
+ cluster_manager.cleanup()
+
+ def cleanup_environment(self):
+ for _, job in self._background_caching_jobs.items():
+ if job:
+ job.cancel()
+ for _, controller in self._test_stream_service_controllers.items():
+ if controller:
+ controller.stop()
+ for pipeline_id, cache_manager in self._cache_managers.items():
+ # Recording manager performs cache manager cleanup during eviction, so
+ # we don't need to clean it up here.
+ if cache_manager and pipeline_id not in self._recording_managers:
+ cache_manager.cleanup()
+ if self.options.cleanup_cluster:
Review comment:
Also call self.clusters.cleanup()
##########
File path:
sdks/python/apache_beam/runners/interactive/interactive_environment.py
##########
@@ -549,6 +589,50 @@ def set_cached_source_signature(self, pipeline, signature):
def get_cached_source_signature(self, pipeline):
return self._cached_source_signature.get(str(id(pipeline)), set())
+ def set_dataproc_cluster_manager(self, pipeline):
+ """Sets the instance of DataprocClusterManager to be used by the
+ pipeline.
+ """
+ if self._is_in_ipython:
+ warnings.filterwarnings(
+ 'ignore',
+ 'options is deprecated since First stable release. References to '
+ '<pipeline>.options will not be supported',
+ category=DeprecationWarning)
+ project_id = (pipeline.options.view_as(GoogleCloudOptions).project)
+ region = (pipeline.options.view_as(GoogleCloudOptions).region)
+ cluster_name = self.clusters.default_cluster_name
+ cluster_manager = DataprocClusterManager(
+ project_id=project_id, region=region, cluster_name=cluster_name)
+ self.clusters._dataproc_cluster_managers[str(id(pipeline))] =
cluster_manager
+
+ def get_dataproc_cluster_manager(self, pipeline):
Review comment:
Move them to Clusters class
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]