VictorPlusC commented on a change in pull request #17127:
URL: https://github.com/apache/beam/pull/17127#discussion_r836857307
##########
File path:
sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector.py
##########
@@ -136,6 +138,60 @@ def get_pcoll_data(self, identifier,
include_window_info=False):
return dataframe.to_json(orient='table')
return {}
+ @as_json
+ def list_clusters(self):
+ """Retrieves information for all clusters as a json.
+
+ The json object maps a unique obfuscated identifier of a cluster to
+ the corresponding cluster_name, project, region, master_url, dashboard,
+ and pipelines. Furthermore, copies the mapping to self._clusters.
+ """
+ from apache_beam.runners.interactive import interactive_environment as ie
+ clusters = ie.current_env().clusters
+ all_cluster_data = {}
+ for master_url in clusters.master_urls:
+ cluster_metadata = clusters.master_urls[master_url]
+ project = cluster_metadata.project_id
+ region = cluster_metadata.region
+ name = cluster_metadata.cluster_name
+
+ all_cluster_data[obfuscate(project, region, name)] = {
+ 'cluster_name': name,
+ 'project': project,
+ 'region': region,
+ 'master_url': master_url,
+ 'dashboard': clusters.master_urls_to_dashboards[master_url],
+ 'pipelines': clusters.master_urls_to_pipelines[master_url]
+ }
+ self._clusters = all_cluster_data
+ return all_cluster_data
+
+ def delete_cluster(self, id: str):
+ """Deletes the cluster with the given obfuscated identifier from the
+ Interactive Environment, as well as from Dataproc. Additionally,
+ unassigns the 'flink_master' pipeline option for all impacted pipelines.
+ """
+ from apache_beam.runners.interactive import interactive_environment as ie
+ pipelines = [
+ ie.current_env().pipeline_id_to_pipeline(pid)
+ for pid in self._clusters[id]['pipelines']
+ ]
+ for p in pipelines:
+ ie.current_env().clusters.cleanup(p)
+ p.options.view_as(FlinkRunnerOptions).flink_master = '[auto]'
+ del self._clusters[id]
Review comment:
This should not - this is because ib.clusters.cleanup(p) will be
invoked. In that method, we delete all cluster mappings corresponding to a
given pipeline. The reason we use ib.clusters.cleanup(p) instead of just
cleaning up the pipeline in the environment with ie.current_env.cleanup(p) is
that we want the delete_cluster() method to only remove the active cluster and
not interfere with any existing pipelines. The intended behavior is to have
clusters be recreated during runtime if they have been deleted. This way, users
can run a pipeline, delete the cluster when they are not using it, and continue
building PCollections from the pipeline when they need to again.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]