[
https://issues.apache.org/jira/browse/BEAM-14332?focusedWorklogId=765727&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-765727
]
ASF GitHub Bot logged work on BEAM-14332:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 03/May/22 22:30
Start Date: 03/May/22 22:30
Worklog Time Spent: 10m
Work Description: rohdesamuel commented on code in PR #17402:
URL: https://github.com/apache/beam/pull/17402#discussion_r864286373
##########
sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py:
##########
@@ -40,45 +40,25 @@ class UnimportedDataproc:
_LOGGER = logging.getLogger(__name__)
+DATAPROC_STAGING_LOG_NAME = 'dataproc-startup-script_output'
Review Comment:
Can you add a comment for what this constant is used for?
##########
sdks/python/apache_beam/runners/interactive/interactive_beam.py:
##########
@@ -339,151 +342,229 @@ def record(self, pipeline):
class Clusters:
- """An interface for users to modify the pipelines that are being run by the
- Interactive Environment.
+ """An interface to manage clusters running workers that are connected with
+ the current interactive environment.
- Methods of the Interactive Beam Clusters class can be accessed via:
- from apache_beam.runners.interactive import interactive_beam as ib
- ib.clusters
+ This module is experimental. No backwards-compatibility guarantees.
- Example of calling the Interactive Beam clusters describe method::
- ib.clusters.describe()
+ Interactive Beam automatically creates/reuses existing worker clusters to
+ execute pipelines when it detects the need from configurations.
+ Currently, the only supported cluster implementation is Flink running on
+ Cloud Dataproc.
+
+ To configure a pipeline to run on Cloud Dataproc with Flink, set the
+ underlying runner of the InteractiveRunner to FlinkRunner and the pipeline
+ options to indicate where on Cloud the FlinkRunner should be deployed to.
+
+ An example to enable automatic Dataproc cluster creation/reuse::
+
+ options = PipelineOptions([
+ '--project=my-project',
+ '--region=my-region',
+ '--environment_type=DOCKER'])
+ pipeline = beam.Pipeline(InteractiveRunner(
+ underlying_runner=FlinkRunner()), options=options)
+
+ Reuse a pipeline options in another pipeline would configure Interactive Beam
+ to reuse the same Dataproc cluster implicitly managed by the current
+ interactive environment.
+ If a flink_master is identified as a known cluster, the corresponding cluster
+ is also resued.
+ Furthermore, if a cluster is explicitly created by using a pipeline as an
+ identifier to a known cluster, the cluster is reused.
+
+ An example::
+
+ # dcm == ib.clusters.pipelines.get(pipeline), no cluster is newly
created.
Review Comment:
Feels a little weird to access a member in a class to get the cluster
manager. Consider changing this into a method like `ib.clusters.get(pipeline)`.
This flows a bit better.
##########
sdks/python/apache_beam/runners/interactive/interactive_beam.py:
##########
@@ -339,151 +342,229 @@ def record(self, pipeline):
class Clusters:
- """An interface for users to modify the pipelines that are being run by the
- Interactive Environment.
+ """An interface to manage clusters running workers that are connected with
+ the current interactive environment.
- Methods of the Interactive Beam Clusters class can be accessed via:
- from apache_beam.runners.interactive import interactive_beam as ib
- ib.clusters
+ This module is experimental. No backwards-compatibility guarantees.
- Example of calling the Interactive Beam clusters describe method::
- ib.clusters.describe()
+ Interactive Beam automatically creates/reuses existing worker clusters to
+ execute pipelines when it detects the need from configurations.
+ Currently, the only supported cluster implementation is Flink running on
+ Cloud Dataproc.
+
+ To configure a pipeline to run on Cloud Dataproc with Flink, set the
+ underlying runner of the InteractiveRunner to FlinkRunner and the pipeline
+ options to indicate where on Cloud the FlinkRunner should be deployed to.
+
+ An example to enable automatic Dataproc cluster creation/reuse::
+
+ options = PipelineOptions([
+ '--project=my-project',
+ '--region=my-region',
+ '--environment_type=DOCKER'])
+ pipeline = beam.Pipeline(InteractiveRunner(
+ underlying_runner=FlinkRunner()), options=options)
+
+ Reuse a pipeline options in another pipeline would configure Interactive Beam
+ to reuse the same Dataproc cluster implicitly managed by the current
+ interactive environment.
+ If a flink_master is identified as a known cluster, the corresponding cluster
+ is also resued.
+ Furthermore, if a cluster is explicitly created by using a pipeline as an
+ identifier to a known cluster, the cluster is reused.
+
+ An example::
+
+ # dcm == ib.clusters.pipelines.get(pipeline), no cluster is newly
created.
+ dcm = ib.clusters.create(pipeline)
+
+ To configure a pipeline to run on an existing FlinkRunner deployed elsewhere,
+ set the flink_master explicitly so no cluster will be created/reused.
+
+ An example pipeline options to skip automatic Dataproc cluster usage::
+
+ options = PipelineOptions([
+ '--flink_master=some.self.hosted.flink:port',
+ '--environment_type=DOCKER'])
+
+ To configure a pipeline to run on a local FlinkRunner, explicitly set the
+ default cluster metadata to None: ib.clusters.set_default_cluster(None).
"""
# Explicitly set the Flink version here to ensure compatibility with 2.0
# Dataproc images:
#
https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-release-2.0
DATAPROC_FLINK_VERSION = '1.12'
+
# TODO(BEAM-14142): Fix the Dataproc image version after a released image
# contains all missing dependencies for Flink to run.
# DATAPROC_IMAGE_VERSION = '2.0.XX-debian10'
- DATAPROC_STAGING_LOG_NAME = 'dataproc-startup-script_output'
def __init__(self) -> None:
- """Instantiates default values for Dataproc cluster interactions.
- """
- # Set the default_cluster_name that will be used when creating Dataproc
- # clusters.
- self.default_cluster_name = 'interactive-beam-cluster'
- # Bidirectional 1-1 mapping between master_urls (str) to cluster metadata
- # (MasterURLIdentifier), where self.master_urls.inverse is a mapping from
- # MasterURLIdentifier -> str.
- self.master_urls = bidict()
- # self.dataproc_cluster_managers map string pipeline ids to instances of
- # DataprocClusterManager.
- self.dataproc_cluster_managers = {}
- # self.master_urls_to_pipelines map string master_urls to lists of
- # pipelines that use the corresponding master_url.
- self.master_urls_to_pipelines: DefaultDict[
- str, List[beam.Pipeline]] = defaultdict(list)
- # self.master_urls_to_dashboards map string master_urls to the
- # corresponding Apache Flink dashboards.
- self.master_urls_to_dashboards: Dict[str, str] = {}
- # self.default_cluster_metadata for creating a DataprocClusterManager when
- # a pipeline has its cluster deleted from the clusters Jupyterlab
- # extension.
- self.default_cluster_metadata = None
-
- def describe(self, pipeline: Optional[beam.Pipeline] = None) -> dict:
- """Returns a description of the cluster associated to the given pipeline.
-
- If no pipeline is given then this returns a dictionary of descriptions for
- all pipelines, mapped to by id.
+ self.dataproc_cluster_managers: Dict[ClusterMetadata,
+ DataprocClusterManager] = {}
+ self.master_urls: Dict[str, ClusterMetadata] = {}
+ self.pipelines: Dict[beam.Pipeline, DataprocClusterManager] = {}
+ self.default_cluster_metadata: Optional[ClusterMetadata] = None
+
+ def create(
+ self, cluster_identifier: ClusterIdentifier) -> DataprocClusterManager:
+ """Creates a Dataproc cluster manager provisioned for the cluster
+ identified. If the cluster is known, returns an existing cluster manager.
"""
- description = {
- pid: dcm.describe()
- for pid,
- dcm in self.dataproc_cluster_managers.items()
- }
- if pipeline:
- return description.get(str(id(pipeline)), None)
- return description
+ # Try to get some not-None cluster metadata.
+ cluster_metadata = self.cluster_metadata(cluster_identifier)
+ if not cluster_metadata:
+ raise ValueError(
+ 'Unknown cluster identifier: %s. Cannot create or reuse'
+ 'a Dataproc cluster.')
+ elif cluster_metadata.region == 'global':
+ # The global region is unsupported as it will be eventually deprecated.
+ raise ValueError('Clusters in the global region are not supported.')
+ elif not cluster_metadata.region:
+ _LOGGER.info(
+ 'No region information was detected, defaulting Dataproc cluster '
+ 'region to: us-central1.')
+ cluster_metadata.region = 'us-central1'
+ # else use the provided region.
+ known_dcm = self.dataproc_cluster_managers.get(cluster_metadata, None)
+ if known_dcm:
+ return known_dcm
+ dcm = DataprocClusterManager(cluster_metadata)
+ dcm.create_flink_cluster()
+ # ClusterMetadata with derivative fields populated by the dcm.
+ derived_meta = dcm.cluster_metadata
+ self.dataproc_cluster_managers[derived_meta] = dcm
+ self.master_urls[derived_meta.master_url] = derived_meta
+ # Update the default cluster metadata to the one just created.
+ self.set_default_cluster(derived_meta)
+ return dcm
def cleanup(
- self, pipeline: Optional[beam.Pipeline] = None, force=False) -> None:
- """Attempt to cleanup the Dataproc Cluster corresponding to the given
- pipeline.
-
- If the cluster is not managed by interactive_beam, a corresponding cluster
- manager is not detected, and deletion is skipped.
-
- For clusters managed by Interactive Beam, by default, deletion is skipped
- if any other pipelines are using the cluster.
-
- Optionally, the cleanup for a cluster managed by Interactive Beam can be
- forced, by setting the 'force' parameter to True.
-
- Example usage of default cleanup::
- interactive_beam.clusters.cleanup(p)
-
- Example usage of forceful cleanup::
- interactive_beam.clusters.cleanup(p, force=True)
+ self, cluster_identifier: Optional[ClusterIdentifier] = None) -> None:
+ """Cleans up the cluster associated with the given cluster_identifier.
+
+ If None cluster_identifier is provided, cleans up for all clusters.
+ If a beam.Pipeline is given as the ClusterIdentifier while multiple
+ pipelines share the same cluster, it only cleans up the association between
+ the pipeline and the cluster identified.
+ If the cluster_identifier is unknown, NOOP.
"""
- if pipeline:
- cluster_manager = self.dataproc_cluster_managers.get(
- str(id(pipeline)), None)
- if cluster_manager:
- master_url = cluster_manager.master_url
- if len(self.master_urls_to_pipelines[master_url]) > 1:
- if force:
- _LOGGER.warning(
- 'Cluster is currently being used by another cluster, but '
- 'will be forcefully cleaned up.')
- cluster_manager.cleanup()
- else:
- _LOGGER.warning(
- 'Cluster is currently being used, skipping deletion.')
- self.master_urls_to_pipelines[master_url].remove(str(id(pipeline)))
- else:
- cluster_manager.cleanup()
- self.master_urls.pop(master_url, None)
- self.master_urls_to_pipelines.pop(master_url, None)
- self.master_urls_to_dashboards.pop(master_url, None)
- self.dataproc_cluster_managers.pop(str(id(pipeline)), None)
+ if not cluster_identifier: # Cleans up everything.
+ for dcm in set(self.dataproc_cluster_managers.values()):
+ self._cleanup(dcm)
+ self.default_cluster_metadata = None
Review Comment:
Feels dangerous to give users such an easy way to clean up all cluster
managers at once. Maybe we should keep the `force` argument and only clean up
everything if it's true?
##########
sdks/python/apache_beam/runners/interactive/interactive_beam.py:
##########
@@ -339,151 +342,229 @@ def record(self, pipeline):
class Clusters:
- """An interface for users to modify the pipelines that are being run by the
- Interactive Environment.
+ """An interface to manage clusters running workers that are connected with
Review Comment:
This file is starting to get pretty big. It doesn't have to be in this PR,
but we may want to start investigating moving some of this logic to sub-modules.
##########
sdks/python/apache_beam/runners/interactive/interactive_runner.py:
##########
@@ -224,77 +218,45 @@ def visit_transform(self, transform_node):
return main_job_result
- # TODO(victorhc): Move this method somewhere else if performance is impacted
- # by generating a cluster during runtime.
- def _get_dataproc_cluster_master_url_if_applicable(
- self, user_pipeline: beam.Pipeline) -> str:
- """ Creates a Dataproc cluster if the provided user_pipeline is running
- FlinkRunner and no flink_master_url was provided as an option. A cluster
- is not created when a flink_master_url is detected.
-
- Example pipeline options to enable automatic Dataproc cluster creation:
- options = PipelineOptions([
- '--runner=FlinkRunner',
- '--project=my-project',
- '--region=my-region',
- '--environment_type=DOCKER'
- ])
-
- Example pipeline options to skip automatic Dataproc cluster creation:
- options = PipelineOptions([
- '--runner=FlinkRunner',
- '--flink_master=example.internal:41979',
- '--environment_type=DOCKER'
- ])
+ def tune_for_flink(
Review Comment:
"tune" generally means "optimize" in this context. But looking through the
method, maybe "configure" is better?
##########
sdks/python/apache_beam/runners/interactive/testing/mock_env.py:
##########
@@ -0,0 +1,90 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module of mocks to isolated the test environment for each Interactive Beam
+test.
+"""
+
+import unittest
+import uuid
+from unittest.mock import patch
+
+from apache_beam.runners.interactive import interactive_environment as ie
+from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import
DataprocClusterManager
+from apache_beam.runners.interactive.interactive_environment import
InteractiveEnvironment
+from apache_beam.runners.interactive.testing.mock_ipython import
mock_get_ipython
+
+
+def isolated_env(cls: unittest.TestCase):
Review Comment:
Pretty cool, can we make this the default for tests and instead have a
decorator to turn this off? Isolated and idempotent unit tests should be the
default.
##########
sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py:
##########
@@ -191,72 +175,60 @@ def cleanup(self) -> None:
'Failed to delete cluster: %s', self.cluster_metadata.cluster_name)
raise e
- def describe(self) -> None:
- """Returns a dictionary describing the cluster."""
- return {
- 'cluster_metadata': self.cluster_metadata,
- 'master_url': self.master_url,
- 'dashboard': self.dashboard
- }
-
- def get_cluster_details(
- self, cluster_metadata: MasterURLIdentifier) -> dataproc_v1.Cluster:
+ def get_cluster_details(self) -> dataproc_v1.Cluster:
"""Gets the Dataproc_v1 Cluster object for the current cluster manager."""
try:
return self._cluster_client.get_cluster(
request={
- 'project_id': cluster_metadata.project_id,
- 'region': cluster_metadata.region,
- 'cluster_name': cluster_metadata.cluster_name
+ 'project_id': self.cluster_metadata.project_id,
+ 'region': self.cluster_metadata.region,
+ 'cluster_name': self.cluster_metadata.cluster_name
})
except Exception as e:
if e.code == 403:
_LOGGER.error(
'Due to insufficient project permissions, '
'unable to retrieve information for cluster: %s',
- cluster_metadata.cluster_name)
+ self.cluster_metadata.cluster_name)
raise ValueError(
'You cannot view clusters in project: {}'.format(
- cluster_metadata.project_id))
+ self.cluster_metadata.project_id))
elif e.code == 404:
_LOGGER.error(
- 'Cluster does not exist: %s', cluster_metadata.cluster_name)
+ 'Cluster does not exist: %s', self.cluster_metadata.cluster_name)
raise ValueError(
- 'Cluster was not found: {}'.format(cluster_metadata.cluster_name))
+ 'Cluster was not found: {}'.format(
+ self.cluster_metadata.cluster_name))
else:
_LOGGER.error(
'Failed to get information for cluster: %s',
- cluster_metadata.cluster_name)
+ self.cluster_metadata.cluster_name)
raise e
- def wait_for_cluster_to_provision(
- self, cluster_metadata: MasterURLIdentifier) -> None:
- while self.get_cluster_details(
- cluster_metadata).status.state.name == 'CREATING':
+ def wait_for_cluster_to_provision(self) -> None:
+ while self.get_cluster_details().status.state.name == 'CREATING':
time.sleep(15)
Review Comment:
Will this always exit this loop?
Issue Time Tracking
-------------------
Worklog Id: (was: 765727)
Time Spent: 40m (was: 0.5h)
> Improve the workflow of cluster management for Flink on Dataproc
> ----------------------------------------------------------------
>
> Key: BEAM-14332
> URL: https://issues.apache.org/jira/browse/BEAM-14332
> Project: Beam
> Issue Type: Improvement
> Components: runner-py-interactive
> Reporter: Ning
> Assignee: Ning
> Priority: P2
> Time Spent: 40m
> Remaining Estimate: 0h
>
> Improve the workflow of cluster management.
> There is an option to configure a default [cluster
> name|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/interactive/interactive_beam.py#L366].
> The existing user flows are:
> # Use the default cluster name to create a new cluster if none is in use;
> # Reuse a created cluster that has the default cluster name;
> # If the default cluster name is configured to a new value, re-apply 1 and 2.
> A better solution is to
> # Create a new cluster implicitly if there is none or explicitly if the user
> wants one with specific provisioning;
> # Always default to using the last created cluster.
> The reasons are:
> * Cluster name is meaningless to the user when a cluster is just a medium to
> run OSS runners (as applications) such as Flink or Spark. The cluster could
> also be running anywhere (on GCP) such as Dataproc, k8s, or even Dataflow
> itself.
> * Clusters should be uniquely identified, thus should always have a distinct
> name. Clusters are managed (created/reused/deleted) behind the scenes by the
> notebook runtime when the user doesn’t explicitly do so (the capability to
> explicitly manage clusters is still available). Reusing the same default
> cluster name is risky when a cluster is deleted by one notebook runtime while
> another cluster with the same name is created by a different notebook
> runtime.
> * Provide the capability for the user to explicitly provision a cluster.
> Current implementation provisions each cluster at the location specified by
> GoogleCloudOptions using 3 worker nodes. There is no explicit API to
> configure the number or shape of workers.
> We could use the WorkerOptions to allow customers to explicitly provision a
> cluster and expose an explicit API (with UX in notebook extension) for
> customers to change the size of a cluster connected with their notebook
> (until we have an auto scaling solution with Dataproc for Flink).
--
This message was sent by Atlassian Jira
(v8.20.7#820007)