KevinGG commented on a change in pull request #17127:
URL: https://github.com/apache/beam/pull/17127#discussion_r839025085
##########
File path:
sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector.py
##########
@@ -136,6 +137,38 @@ def get_pcoll_data(self, identifier,
include_window_info=False):
return dataframe.to_json(orient='table')
return {}
+ @as_json
+ def list_clusters(self):
+ """Retrieves information for all clusters as a json.
+
+ The json object maps a unique obfuscated identifier of a cluster to
+ the corresponding cluster_name, project, region, master_url, dashboard,
+ and pipelines. Furthermore, copies the mapping to self._clusters.
+ """
+ from apache_beam.runners.interactive import interactive_environment as ie
+ clusters = ie.current_env().clusters
+ all_cluster_data = {}
+ for master_url in clusters.master_urls:
+ cluster_metadata = clusters.master_urls[master_url]
+ project = cluster_metadata.project_id
+ region = cluster_metadata.region
+ name = cluster_metadata.cluster_name
+
+ all_cluster_data[obfuscate(project, region, name)] = {
+ 'cluster_name': name,
+ 'project': project,
+ 'region': region,
+ 'master_url': master_url,
+ 'dashboard': clusters.master_urls_to_dashboards[master_url],
+ 'pipelines': clusters.master_urls_to_pipelines[master_url]
+ }
+ self._clusters = all_cluster_data
+ return all_cluster_data
+
+ def get_cluster_data(self):
Review comment:
Ditto the comment for TS invocation:
instead of
```
def get_cluster_data(self):
return self._clusters
```
change it to
```
def get_cluster_data(self, id:str):
return self._clusters[id] # The id is guaranteed to exist.
```
so that the obfuscated id stays in scope of this module and never leak it to
other in-kernel modules.
You can even make it more explicit by having
```
def get_cluster_master_url_identifier(self, id:str) -> MasterURLIdentifier:
# or str master_url whichever you find easier.
```
Then use corresponding API in your Clusters class as needed.
##########
File path:
sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/clusters/Clusters.tsx
##########
@@ -0,0 +1,289 @@
+// Licensed under the Apache License, Version 2.0 (the 'License'); you may not
+// use this file except in compliance with the License. You may obtain a copy
of
+// the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations
under
+// the License.
+
+import * as React from 'react';
+
+import { ISessionContext } from '@jupyterlab/apputils';
+
+import { Button } from '@rmwc/button';
+import {
+ DataTable,
+ DataTableContent,
+ DataTableRow,
+ DataTableHeadCell
+} from '@rmwc/data-table';
+
+import {
+ Dialog,
+ DialogTitle,
+ DialogContent,
+ DialogActions,
+ DialogButton
+} from '@rmwc/dialog';
+
+import { Fab } from '@rmwc/fab';
+
+import { Select } from '@rmwc/select';
+
+import {
+ TopAppBar,
+ TopAppBarFixedAdjust,
+ TopAppBarRow,
+ TopAppBarSection,
+ TopAppBarTitle
+} from '@rmwc/top-app-bar';
+
+import { KernelModel } from '../kernel/KernelModel';
+
+import '@rmwc/button/styles';
+import '@rmwc/data-table/styles';
+import '@rmwc/dialog/styles';
+import '@rmwc/fab/styles';
+import '@rmwc/select/styles';
+import '@rmwc/top-app-bar/styles';
+
+interface IClustersProps {
+ sessionContext: ISessionContext;
+}
+
+interface IClustersState {
+ kernelDisplayName: string;
+ clusters: object;
+ defaultClusterId: string;
+ selectedId: string;
+ selectedName: string;
+ showDialog: boolean;
+ displayTable: boolean;
+}
+
+/**
+ * This component is an interactive data-table that inspects Dataproc clusters
+ * managed by Apache Beam.
+ *
+ * The following user functionality is provided in this component:
+ * 1. View all Dataproc clusters managed by Interactive Beam
+ * 2. Delete a selected cluster. This will "reset" the cluster used by the
+ * corresponding pipelines and the cluster will be recreated when the
+ * pipeline is executed again.
+ * 3. Select a default cluster. This is the cluster that will be used by
+ * Interactive Beam when no master_url or Google Cloud project is
+ * specified in the pipeline options.
+ */
+export class Clusters extends React.Component<IClustersProps, IClustersState> {
+ constructor(props: IClustersProps) {
+ super(props);
+ this._inspectKernelCode =
+ 'from apache_beam.runners.interactive ' +
+ 'import interactive_environment as ie\n' +
+ 'ie.current_env().inspector.list_clusters()';
+ this._model = new KernelModel(this.props.sessionContext);
+ this.state = {
+ kernelDisplayName: 'no kernel',
+ clusters: this._model.executeResult,
+ defaultClusterId: '',
+ selectedId: '',
+ selectedName: '',
+ showDialog: false,
+ displayTable: true
+ };
+ }
+
+ componentDidMount(): void {
+ this._queryKernelTimerId = setInterval(
+ () => this.queryKernel(this._inspectKernelCode),
+ 2000
+ );
+ this._updateRenderTimerId = setInterval(() => this.updateRender(), 1000);
+ this._updateSessionInfoTimerId = setInterval(
+ () => this.updateSessionInfo(),
+ 2000
+ );
+ }
+
+ componentWillUnmount(): void {
+ clearInterval(this._queryKernelTimerId);
+ clearInterval(this._updateRenderTimerId);
+ clearInterval(this._updateSessionInfoTimerId);
+ }
+
+ queryKernel(code: string): void {
+ this._model.execute(code);
+ }
+
+ updateRender(): void {
+ const clustersToUpdate = this._model.executeResult;
+ if (Object.keys(clustersToUpdate).length) {
+ this.setState({ displayTable: true });
+ if (
+ JSON.stringify(this.state.clusters) !==
JSON.stringify(clustersToUpdate)
+ ) {
+ this.setState({ clusters: clustersToUpdate });
+ }
+ }
+ }
+
+ updateSessionInfo(): void {
+ if (this.props.sessionContext) {
+ const newKernelDisplayName = this.props.sessionContext.kernelDisplayName;
+ if (newKernelDisplayName !== this.state.kernelDisplayName) {
+ this.setState({
+ kernelDisplayName: newKernelDisplayName
+ });
+ }
+ }
+ }
+
+ setDefaultCluster(cluster_id: string): void {
+ const setDefaultClusterCode =
+ 'ie.current_env().clusters.set_default_cluster' + `('${cluster_id}')`;
+ this.queryKernel(setDefaultClusterCode);
+ this.setState({ defaultClusterId: cluster_id });
+ }
+
+ displayDialog(open: boolean, key: string, clusterName: string): void {
+ this.setState({
+ showDialog: open,
+ selectedId: key,
+ selectedName: clusterName
+ });
+ }
+
+ deleteCluster(cluster_id: string): void {
+ const deleteClusterCode =
+ 'ie.current_env().clusters.delete_cluster' + `('${cluster_id}')`;
Review comment:
Can we change this from
```
'ie.current_env().clusters.delete_cluster' + `('${cluster_id}')`
```
to
```
'ie.current_env().clusters.delete_cluster' +
`('ie.current_env().inspector.get_cluster_data(${cluster_id})')`
```
So in the inspector class, instead of
```
def get_cluster_data(self):
return self._clusters
```
change it to
```
def get_cluster_data(self, id:str):
return self._clusters[id] # The id is guaranteed to exist.
```
##########
File path: sdks/python/apache_beam/runners/interactive/interactive_runner.py
##########
@@ -267,9 +267,12 @@ def _get_dataproc_cluster_master_url_if_applicable(
category=DeprecationWarning)
project_id =
(user_pipeline.options.view_as(GoogleCloudOptions).project)
region = (user_pipeline.options.view_as(GoogleCloudOptions).region)
- cluster_name = ie.current_env().clusters.default_cluster_name
- cluster_metadata = MasterURLIdentifier(
- project_id=project_id, region=region, cluster_name=cluster_name)
+ if not project_id:
+ cluster_metadata = ie.current_env().clusters.default_cluster_metadata
+ else:
+ cluster_name = ie.current_env().clusters.default_cluster_name
+ cluster_metadata = MasterURLIdentifier(
+ project_id=project_id, region=region, cluster_name=cluster_name)
Review comment:
SG, please add a brief comment in the code for future viewers.
##########
File path: sdks/python/apache_beam/runners/interactive/interactive_beam.py
##########
@@ -443,6 +448,31 @@ def cleanup(
self.master_urls_to_pipelines.clear()
self.master_urls_to_dashboards.clear()
+ def delete_cluster(self, id: str):
+ """Deletes the cluster with the given obfuscated identifier from the
+ Interactive Environment, as well as from Dataproc. Additionally, unassigns
+ the 'flink_master' pipeline option for all impacted pipelines.
+ """
+ clusters_data = ie.current_env().inspector.get_cluster_data()
+ pipelines = [
+ ie.current_env().pipeline_id_to_pipeline(pid)
+ for pid in clusters_data[id]['pipelines']
+ ]
+ for p in pipelines:
+ ie.current_env().clusters.cleanup(p)
+ p.options.view_as(FlinkRunnerOptions).flink_master = '[auto]'
+
+ def set_default_cluster(self, id: str):
Review comment:
Ditto.
You can define it as:
```
def set_default_cluster(self, master:Union[str, MasterURLIdentifier]):
```
##########
File path:
sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/clusters/Clusters.tsx
##########
@@ -0,0 +1,289 @@
+// Licensed under the Apache License, Version 2.0 (the 'License'); you may not
+// use this file except in compliance with the License. You may obtain a copy
of
+// the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations
under
+// the License.
+
+import * as React from 'react';
+
+import { ISessionContext } from '@jupyterlab/apputils';
+
+import { Button } from '@rmwc/button';
+import {
+ DataTable,
+ DataTableContent,
+ DataTableRow,
+ DataTableHeadCell
+} from '@rmwc/data-table';
+
+import {
+ Dialog,
+ DialogTitle,
+ DialogContent,
+ DialogActions,
+ DialogButton
+} from '@rmwc/dialog';
+
+import { Fab } from '@rmwc/fab';
+
+import { Select } from '@rmwc/select';
+
+import {
+ TopAppBar,
+ TopAppBarFixedAdjust,
+ TopAppBarRow,
+ TopAppBarSection,
+ TopAppBarTitle
+} from '@rmwc/top-app-bar';
+
+import { KernelModel } from '../kernel/KernelModel';
+
+import '@rmwc/button/styles';
+import '@rmwc/data-table/styles';
+import '@rmwc/dialog/styles';
+import '@rmwc/fab/styles';
+import '@rmwc/select/styles';
+import '@rmwc/top-app-bar/styles';
+
+interface IClustersProps {
+ sessionContext: ISessionContext;
+}
+
+interface IClustersState {
+ kernelDisplayName: string;
+ clusters: object;
+ defaultClusterId: string;
+ selectedId: string;
+ selectedName: string;
+ showDialog: boolean;
+ displayTable: boolean;
+}
+
+/**
+ * This component is an interactive data-table that inspects Dataproc clusters
+ * managed by Apache Beam.
+ *
+ * The following user functionality is provided in this component:
+ * 1. View all Dataproc clusters managed by Interactive Beam
+ * 2. Delete a selected cluster. This will "reset" the cluster used by the
+ * corresponding pipelines and the cluster will be recreated when the
+ * pipeline is executed again.
+ * 3. Select a default cluster. This is the cluster that will be used by
+ * Interactive Beam when no master_url or Google Cloud project is
+ * specified in the pipeline options.
+ */
+export class Clusters extends React.Component<IClustersProps, IClustersState> {
+ constructor(props: IClustersProps) {
+ super(props);
+ this._inspectKernelCode =
+ 'from apache_beam.runners.interactive ' +
+ 'import interactive_environment as ie\n' +
Review comment:
Imports are not needed because the KernelModel always imports for you.
##########
File path: sdks/python/apache_beam/runners/interactive/interactive_beam.py
##########
@@ -443,6 +448,31 @@ def cleanup(
self.master_urls_to_pipelines.clear()
self.master_urls_to_dashboards.clear()
+ def delete_cluster(self, id: str):
Review comment:
Do not leak the obfuscated id from inspector to this module because it's
meaningless in other in-kernel modules.
See the comment for inspector below for more details.
You can define this as
```
def delete_cluster(self, master: Union[str, MasterURLIdentifier]):
// get pipelines from self._master_urls_to_pipelines
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]