KevinGG commented on a change in pull request #16904:
URL: https://github.com/apache/beam/pull/16904#discussion_r816261386
##########
File path:
sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py
##########
@@ -186,15 +210,108 @@ def describe(self) -> None:
"""Returns a dictionary describing the cluster."""
return {
'cluster_metadata': self.cluster_metadata,
- 'master_url': self.master_url
+ 'master_url': self.master_url,
+ 'dashboard': self.dashboard
}
- def get_master_url(self, identifier) -> None:
+ def get_cluster_details(self, cluster_metadata: MasterURLIdentifier):
+ """Gets the Dataproc_v1 Cluster object for the current cluster manager."""
+ try:
+ return self._cluster_client.get_cluster(
+ request={
+ 'project_id': cluster_metadata.project_id,
+ 'region': cluster_metadata.region,
+ 'cluster_name': cluster_metadata.cluster_name
+ })
+ except Exception as e:
+ if e.code == 403:
+ _LOGGER.error(
+ 'Due to insufficient project permissions, '
+ 'unable to retrieve information for cluster: %s',
+ cluster_metadata.cluster_name)
+ raise ValueError(
+ 'You cannot view clusters in project: {}'.format(
+ cluster_metadata.project_id))
+ elif e.code == 404:
+ _LOGGER.error(
+ 'Cluster does not exist: %s', cluster_metadata.cluster_name)
+ raise ValueError(
+ 'Cluster was not found: {}'.format(cluster_metadata.cluster_name))
+ else:
+ _LOGGER.error(
+ 'Failed to get information for cluster: %s',
+ cluster_metadata.cluster_name)
+ raise e
+
+ def wait_for_cluster_to_provision(
+ self, cluster_metadata: MasterURLIdentifier) -> None:
+ while self.get_cluster_details(
+ cluster_metadata).status.state.name == 'CREATING':
+ time.sleep(15)
+
+ def get_staging_location(self, cluster_metadata: MasterURLIdentifier) -> str:
+ """Gets the staging bucket of an existing Dataproc cluster."""
+ try:
+ daemon_thread = Thread(
+ target=self.wait_for_cluster_to_provision(cluster_metadata))
+ daemon_thread.setDaemon(True)
+ daemon_thread.start()
+ cluster_details = self.get_cluster_details(cluster_metadata)
+ bucket_name = cluster_details.config.config_bucket
+ gcs_path = 'gs://' + bucket_name + '/google-cloud-dataproc-metainfo/'
+ for file in self._fs._list(gcs_path):
+ if cluster_metadata.cluster_name in file.path:
+ # this file path split will look something like:
+ # ['gs://.../google-cloud-dataproc-metainfo/{staging_dir}/',
+ # '-{node-type}/dataproc-startup-script_output']
+ return file.path.split(cluster_metadata.cluster_name)[0]
+ except Exception as e:
+ _LOGGER.error(
+ 'Failed to get %s cluster staging bucket.',
+ cluster_metadata.cluster_name)
+ raise e
+
+ def parse_master_url_and_dashboard(
+ self, cluster_metadata: MasterURLIdentifier,
+ line: str) -> Tuple[str, str]:
+ """Parses the master_url and YARN application_id of the Flink process from
+ an input line.
Review comment:
Explain what it looks like by providing an anonymized example line.
##########
File path: sdks/python/apache_beam/runners/interactive/interactive_runner.py
##########
@@ -220,7 +229,7 @@ def visit_transform(self, transform_node):
# TODO(victorhc): Move this method somewhere else if performance is impacted
# by generating a cluster during runtime.
- def _create_dataproc_cluster_if_applicable(self, user_pipeline):
+ def _get_dataproc_cluster_master_url_if_applicable(self, user_pipeline):
Review comment:
Please add the type hints.
##########
File path:
sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py
##########
@@ -127,19 +148,20 @@ def create_cluster(self, cluster: dict) -> None:
'Unable to create cluster: %s', self.cluster_metadata.cluster_name)
raise e
- # TODO(victorhc): Add support for user-specified pip packages
def create_flink_cluster(self) -> None:
"""Calls _create_cluster with a configuration that enables FlinkRunner."""
cluster = {
'project_id': self.cluster_metadata.project_id,
'cluster_name': self.cluster_metadata.cluster_name,
'config': {
'software_config': {
+ 'image_version': '2.0.31-debian10',
'optional_components': ['DOCKER', 'FLINK']
},
'gce_cluster_config': {
'metadata': {
- 'flink-start-yarn-session': 'true'
+ 'flink-start-yarn-session': 'true',
+ 'PIP_PACKAGES': 'apache-beam[gcp]==2.35.0'
Review comment:
Get it from the
[version.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/version.py)
module.
##########
File path:
sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py
##########
@@ -186,15 +210,108 @@ def describe(self) -> None:
"""Returns a dictionary describing the cluster."""
return {
'cluster_metadata': self.cluster_metadata,
- 'master_url': self.master_url
+ 'master_url': self.master_url,
+ 'dashboard': self.dashboard
}
- def get_master_url(self, identifier) -> None:
+ def get_cluster_details(self, cluster_metadata: MasterURLIdentifier):
Review comment:
Add a type hint if possible. You can add a fake Cluster class in your
except part of module imports in this file.
##########
File path:
sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py
##########
@@ -186,15 +210,108 @@ def describe(self) -> None:
"""Returns a dictionary describing the cluster."""
return {
'cluster_metadata': self.cluster_metadata,
- 'master_url': self.master_url
+ 'master_url': self.master_url,
+ 'dashboard': self.dashboard
}
- def get_master_url(self, identifier) -> None:
+ def get_cluster_details(self, cluster_metadata: MasterURLIdentifier):
+ """Gets the Dataproc_v1 Cluster object for the current cluster manager."""
+ try:
+ return self._cluster_client.get_cluster(
+ request={
+ 'project_id': cluster_metadata.project_id,
+ 'region': cluster_metadata.region,
+ 'cluster_name': cluster_metadata.cluster_name
+ })
+ except Exception as e:
+ if e.code == 403:
+ _LOGGER.error(
+ 'Due to insufficient project permissions, '
+ 'unable to retrieve information for cluster: %s',
+ cluster_metadata.cluster_name)
+ raise ValueError(
+ 'You cannot view clusters in project: {}'.format(
+ cluster_metadata.project_id))
+ elif e.code == 404:
+ _LOGGER.error(
+ 'Cluster does not exist: %s', cluster_metadata.cluster_name)
+ raise ValueError(
+ 'Cluster was not found: {}'.format(cluster_metadata.cluster_name))
+ else:
+ _LOGGER.error(
+ 'Failed to get information for cluster: %s',
+ cluster_metadata.cluster_name)
+ raise e
+
+ def wait_for_cluster_to_provision(
+ self, cluster_metadata: MasterURLIdentifier) -> None:
+ while self.get_cluster_details(
+ cluster_metadata).status.state.name == 'CREATING':
+ time.sleep(15)
+
+ def get_staging_location(self, cluster_metadata: MasterURLIdentifier) -> str:
+ """Gets the staging bucket of an existing Dataproc cluster."""
+ try:
+ daemon_thread = Thread(
Review comment:
The daemon thread usage nullifies the blocking nature of the function
`wait_for_cluster_to_provision`.
The idea to use a daemon thread is to create the cluster and fetch
information of it in a non-blocking way. But I do think applying it makes the
code more complicated. Let's avoid it for now and make the code blocking by
itself.
##########
File path: sdks/python/apache_beam/runners/interactive/interactive_beam.py
##########
@@ -452,8 +457,7 @@ def cleanup(
# Examples:
# ib.clusters.describe(p)
# Check the docstrings for detailed usages.
-# TODO(victorhc): Implement all functionality for Clusters()
-# clusters = Clusters()
+clusters = Clusters()
Review comment:
Let's not enable it until we have a working example. You can update the
TODO item to be "Resolve connection issue and add a working example"
##########
File path: sdks/python/apache_beam/runners/interactive/interactive_runner.py
##########
@@ -137,6 +137,20 @@ def run_pipeline(self, pipeline, options):
watch_sources(pipeline)
user_pipeline = ie.current_env().user_pipeline(pipeline)
+ if user_pipeline:
+ # When the underlying_runner is a FlinkRunner instance, create a
+ # corresponding DataprocClusterManager for it if no flink_master_url
+ # is provided.
+ master_url = self._get_dataproc_cluster_master_url_if_applicable(
+ user_pipeline)
+ if master_url:
+ flink_master_option = '--flink_master={}'.format(master_url)
Review comment:
Try to use the `view_as(FlinkOptions)` function instead of parsing it as
a dict.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]