This is an automated email from the ASF dual-hosted git repository.
ningk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f68ae5ff467 [BEAM-14449] Support cluster provisioning when using Flink
on Dataproc (#17736)
f68ae5ff467 is described below
commit f68ae5ff467c2a52f49aacd8f357aa210ec80354
Author: Ning Kang <[email protected]>
AuthorDate: Tue May 31 11:02:24 2022 -0700
[BEAM-14449] Support cluster provisioning when using Flink on Dataproc
(#17736)
* [BEAM-14449] Support cluster provisioning when using Flink on Dataproc
- Added support of provisioning cluster for Flink on Dataproc.
- Fixed the jackson Module provider not a subtype issue when
running Beam pipelines on EMRs by explicitly declaring the jaxb dependency
to flink_runner build file as a runtimeOnly dependency.
---
runners/flink/flink_runner.gradle | 1 +
.../dataproc/dataproc_cluster_manager.py | 20 +++++++++
.../runners/interactive/dataproc/types.py | 39 ++++++++++++++++--
.../runners/interactive/interactive_beam.py | 41 +++++++++++++++---
.../runners/interactive/interactive_beam_test.py | 6 +++
.../runners/interactive/interactive_runner.py | 34 +++++++++++++--
.../runners/interactive/interactive_runner_test.py | 48 +++++++++++++++++++++-
7 files changed, 177 insertions(+), 12 deletions(-)
diff --git a/runners/flink/flink_runner.gradle
b/runners/flink/flink_runner.gradle
index 156e1a12e82..fde66993981 100644
--- a/runners/flink/flink_runner.gradle
+++ b/runners/flink/flink_runner.gradle
@@ -207,6 +207,7 @@ dependencies {
implementation project(path: ":model:job-management", configuration:
"shadow")
implementation project(":sdks:java:fn-execution")
implementation library.java.jackson_databind
+ runtimeOnly library.java.jackson_jaxb_annotations
implementation "org.apache.flink:flink-annotations:$flink_version"
examplesJavaIntegrationTest project(project.path)
examplesJavaIntegrationTest project(":examples:java")
diff --git
a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py
b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py
index 2e2007abc79..7332e59c339 100644
---
a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py
+++
b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py
@@ -136,6 +136,11 @@ class DataprocClusterManager:
'https://www.googleapis.com/auth/cloud-platform'
]
},
+ 'master_config': {
+ # There must be 1 and only 1 instance of master.
+ 'num_instances': 1
+ },
+ 'worker_config': {},
'endpoint_config': {
'enable_http_port_access': True
}
@@ -145,6 +150,21 @@ class DataprocClusterManager:
'.', '_')
}
}
+
+ # Additional gce_cluster_config.
+ gce_cluster_config = cluster['config']['gce_cluster_config']
+ if self.cluster_metadata.subnetwork:
+ gce_cluster_config['subnetwork_uri'] = self.cluster_metadata.subnetwork
+
+ # Additional InstanceGroupConfig for master and workers.
+ master_config = cluster['config']['master_config']
+ worker_config = cluster['config']['worker_config']
+ if self.cluster_metadata.num_workers:
+ worker_config['num_instances'] = self.cluster_metadata.num_workers
+ if self.cluster_metadata.machine_type:
+ master_config['machine_type_uri'] = self.cluster_metadata.machine_type
+ worker_config['machine_type_uri'] = self.cluster_metadata.machine_type
+
self.create_cluster(cluster)
def cleanup(self) -> None:
diff --git a/sdks/python/apache_beam/runners/interactive/dataproc/types.py
b/sdks/python/apache_beam/runners/interactive/dataproc/types.py
index ed2400e48f1..86f255c754f 100644
--- a/sdks/python/apache_beam/runners/interactive/dataproc/types.py
+++ b/sdks/python/apache_beam/runners/interactive/dataproc/types.py
@@ -26,21 +26,51 @@ from typing import Union
from apache_beam.pipeline import Pipeline
-def _default_cluster_name():
+def _generate_unique_cluster_name():
return f'interactive-beam-{uuid.uuid4().hex}'
@dataclass
class ClusterMetadata:
+ """Metadata of a provisioned worker cluster that executes Beam pipelines.
+
+ Apache Beam supports running Beam pipelines on different runners provisioned
+ in different setups based on the runner and pipeline options associated with
+ each pipeline. To provide similar portability features, Interactive Beam
+ automatically extracts such ClusterMetadata information from pipeline options
+ of a pipeline in the REPL context and provision suitable clusters to execute
+ the pipeline. The lifecyle of the clusters is managed by Interactive Beam
+ and the user doesn not need to interact with it.
+
+ It's not recommended to build this ClusterMetadata from raw values nor use it
+ to interact with the cluster management logic directly.
+
+ Interactive Beam now supports::
+
+ 1. Runner: FlinkRunner; Setup: on Google Cloud with Flink on Dataproc.
+
+ """
project_id: Optional[str] = None
region: Optional[str] = None
- cluster_name: Optional[str] = field(default_factory=_default_cluster_name)
+ cluster_name: Optional[str] = field(
+ default_factory=_generate_unique_cluster_name)
+ # From WorkerOptions.
+ subnetwork: Optional[str] = None
+ num_workers: Optional[int] = None
+ machine_type: Optional[int] = None
+
# Derivative fields do not affect hash or comparison.
master_url: Optional[str] = None
dashboard: Optional[str] = None
def __key(self):
- return (self.project_id, self.region, self.cluster_name)
+ return (
+ self.project_id,
+ self.region,
+ self.cluster_name,
+ self.subnetwork,
+ self.num_workers,
+ self.machine_type)
def __hash__(self):
return hash(self.__key())
@@ -50,5 +80,8 @@ class ClusterMetadata:
return self.__key() == other.__key()
return False
+ def reset_name(self):
+ self.cluster_name = _generate_unique_cluster_name()
+
ClusterIdentifier = Union[str, Pipeline, ClusterMetadata]
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_beam.py
b/sdks/python/apache_beam/runners/interactive/interactive_beam.py
index 7f9cf232b6a..9db657e65b5 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_beam.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_beam.py
@@ -379,6 +379,12 @@ class Clusters:
# manager without creating a new one.
dcm = ib.clusters.create(pipeline)
+ To provision the cluster, use WorkerOptions. Supported configurations are::
+
+ 1. subnetwork
+ 2. num_workers
+ 3. machine_type
+
To configure a pipeline to run on an existing FlinkRunner deployed elsewhere,
set the flink_master explicitly so no cluster will be created/reused.
@@ -396,6 +402,9 @@ class Clusters:
#
https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-release-2.0
DATAPROC_FLINK_VERSION = '1.12'
+ # The minimum worker number to create a Dataproc cluster.
+ DATAPROC_MINIMUM_WORKER_NUM = 2
+
# TODO(BEAM-14142): Fix the Dataproc image version after a released image
# contains all missing dependencies for Flink to run.
# DATAPROC_IMAGE_VERSION = '2.0.XX-debian10'
@@ -418,15 +427,22 @@ class Clusters:
raise ValueError(
'Unknown cluster identifier: %s. Cannot create or reuse'
'a Dataproc cluster.')
- elif cluster_metadata.region == 'global':
- # The global region is unsupported as it will be eventually deprecated.
- raise ValueError('Clusters in the global region are not supported.')
- elif not cluster_metadata.region:
+ if not cluster_metadata.region:
_LOGGER.info(
'No region information was detected, defaulting Dataproc cluster '
'region to: us-central1.')
cluster_metadata.region = 'us-central1'
+ elif cluster_metadata.region == 'global':
+ # The global region is unsupported as it will be eventually deprecated.
+ raise ValueError('Clusters in the global region are not supported.')
# else use the provided region.
+ if (cluster_metadata.num_workers and
+ cluster_metadata.num_workers < self.DATAPROC_MINIMUM_WORKER_NUM):
+ _LOGGER.info(
+ 'At least %s workers are required for a cluster, defaulting to %s.',
+ self.DATAPROC_MINIMUM_WORKER_NUM,
+ self.DATAPROC_MINIMUM_WORKER_NUM)
+ cluster_metadata.num_workers = self.DATAPROC_MINIMUM_WORKER_NUM
known_dcm = self.dataproc_cluster_managers.get(cluster_metadata, None)
if known_dcm:
return known_dcm
@@ -475,7 +491,9 @@ class Clusters:
'options is deprecated since First stable release. References to '
'<pipeline>.options will not be supported',
category=DeprecationWarning)
- p.options.view_as(FlinkRunnerOptions).flink_master = '[auto]'
+ p_flink_options = p.options.view_as(FlinkRunnerOptions)
+ p_flink_options.flink_master = '[auto]'
+ p_flink_options.flink_version = None
# Only cleans up when there is no pipeline using the cluster.
if not dcm.pipelines:
self._cleanup(dcm)
@@ -558,6 +576,19 @@ class Clusters:
meta = cluster_identifier
if meta in self.dataproc_cluster_managers:
meta = self.dataproc_cluster_managers[meta].cluster_metadata
+ elif (meta and self.default_cluster_metadata and
+ meta.cluster_name == self.default_cluster_metadata.cluster_name):
+ _LOGGER.warning(
+ 'Cannot change the configuration of the running cluster %s. '
+ 'Existing is %s, desired is %s.',
+ self.default_cluster_metadata.cluster_name,
+ self.default_cluster_metadata,
+ meta)
+ meta.reset_name()
+ _LOGGER.warning(
+ 'To avoid conflict, issuing a new cluster name %s '
+ 'for a new cluster.',
+ meta.cluster_name)
else:
raise TypeError(
'A cluster_identifier should be Optional[Union[str, '
diff --git
a/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py
b/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py
index c5dd0ea99e0..f0967bba453 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py
@@ -563,6 +563,12 @@ class InteractiveBeamClustersTest(unittest.TestCase):
meta_list = self.clusters.describe(cid_meta)
self.assertEqual([known_meta, known_meta2], meta_list)
+ def test_default_value_for_invalid_worker_number(self):
+ meta = ClusterMetadata(project_id='test-project', num_workers=1)
+ self.clusters.create(meta)
+
+ self.assertEqual(meta.num_workers, 2)
+
if __name__ == '__main__':
unittest.main()
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner.py
b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
index b3909df769a..04338015c50 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
@@ -29,6 +29,7 @@ from apache_beam import runners
from apache_beam.options.pipeline_options import FlinkRunnerOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.pipeline import PipelineVisitor
from apache_beam.runners.direct import direct_runner
from apache_beam.runners.interactive import interactive_environment as ie
@@ -220,7 +221,12 @@ class InteractiveRunner(runners.PipelineRunner):
def configure_for_flink(
self, user_pipeline: beam.Pipeline, options: PipelineOptions) -> None:
- """Tunes the pipeline options for the setup of running a job with Flink.
+ """Configures the pipeline options for running a job with Flink.
+
+ When running with a FlinkRunner, a job server started from an uber jar
+ (locally built or remotely downloaded) hosting the beam_job_api will
+ communicate with the Flink cluster located at the given flink_master in the
+ pipeline options.
"""
clusters = ie.current_env().clusters
if clusters.pipelines.get(user_pipeline, None):
@@ -243,6 +249,8 @@ class InteractiveRunner(runners.PipelineRunner):
# Generate the metadata with a new unique cluster name.
cluster_metadata = ClusterMetadata(
project_id=project_id, region=region)
+ # Add additional configurations.
+ self._worker_options_to_cluster_metadata(options, cluster_metadata)
# else use the default cluster metadata.
elif flink_master in clusters.master_urls:
cluster_metadata = clusters.cluster_metadata(flink_master)
@@ -254,9 +262,29 @@ class InteractiveRunner(runners.PipelineRunner):
# Side effects associated with the user_pipeline.
clusters.pipelines[user_pipeline] = dcm
dcm.pipelines.add(user_pipeline)
+ self._configure_flink_options(
+ options,
+ clusters.DATAPROC_FLINK_VERSION,
+ dcm.cluster_metadata.master_url)
+
+ def _worker_options_to_cluster_metadata(
+ self, options: PipelineOptions, cluster_metadata: ClusterMetadata):
+ worker_options = options.view_as(WorkerOptions)
+ if worker_options.subnetwork:
+ cluster_metadata.subnetwork = worker_options.subnetwork
+ if worker_options.num_workers:
+ cluster_metadata.num_workers = worker_options.num_workers
+ if worker_options.machine_type:
+ cluster_metadata.machine_type = worker_options.machine_type
+
+ def _configure_flink_options(
+ self, options: PipelineOptions, flink_version: str, master_url: str):
flink_options = options.view_as(FlinkRunnerOptions)
- flink_options.flink_master = dcm.cluster_metadata.master_url
- flink_options.flink_version = clusters.DATAPROC_FLINK_VERSION
+ flink_options.flink_version = flink_version
+ # flink_options.flink_job_server_jar will be populated by the
+ # apache_beam.utils.subprocess_server.JavaJarServer.path_to_beam_jar,
+ # do not populate it explicitly.
+ flink_options.flink_master = master_url
class PipelineResult(beam.runners.runner.PipelineResult):
diff --git
a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
index d6ead00b692..d5bc475c921 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
@@ -33,6 +33,7 @@ from apache_beam.dataframe.convert import to_dataframe
from apache_beam.options.pipeline_options import FlinkRunnerOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
+from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.runners.direct import direct_runner
from apache_beam.runners.interactive import interactive_beam as ib
from apache_beam.runners.interactive import interactive_environment as ie
@@ -490,7 +491,7 @@ class InteractiveRunnerTest(unittest.TestCase):
not ie.current_env().is_interactive_ready,
'[interactive] dependency is not installed.')
@isolated_env
-class TuneForFlinkTest(unittest.TestCase):
+class ConfigForFlinkTest(unittest.TestCase):
def test_create_a_new_cluster_for_a_new_pipeline(self):
clusters = self.current_env.clusters
runner = interactive_runner.InteractiveRunner(
@@ -585,6 +586,51 @@ class TuneForFlinkTest(unittest.TestCase):
self.assertEqual(
flink_options.flink_version, clusters.DATAPROC_FLINK_VERSION)
+ def test_worker_options_to_cluster_metadata(self):
+ clusters = self.current_env.clusters
+ runner = interactive_runner.InteractiveRunner(
+ underlying_runner=FlinkRunner())
+ options = PipelineOptions(project='test-project', region='test-region')
+ worker_options = options.view_as(WorkerOptions)
+ worker_options.num_workers = 2
+ worker_options.subnetwork = 'test-network'
+ worker_options.machine_type = 'test-machine-type'
+ p = beam.Pipeline(runner=runner, options=options)
+ runner.configure_for_flink(p, options)
+
+ configured_meta = clusters.cluster_metadata(p)
+ self.assertEqual(configured_meta.num_workers, worker_options.num_workers)
+ self.assertEqual(configured_meta.subnetwork, worker_options.subnetwork)
+ self.assertEqual(configured_meta.machine_type, worker_options.machine_type)
+
+ def test_configure_flink_options(self):
+ clusters = self.current_env.clusters
+ runner = interactive_runner.InteractiveRunner(
+ underlying_runner=FlinkRunner())
+ options = PipelineOptions(project='test-project', region='test-region')
+ p = beam.Pipeline(runner=runner, options=options)
+ runner.configure_for_flink(p, options)
+
+ flink_options = options.view_as(FlinkRunnerOptions)
+ self.assertEqual(
+ flink_options.flink_version, clusters.DATAPROC_FLINK_VERSION)
+ self.assertTrue(flink_options.flink_master.startswith('test-url-'))
+
+ def test_configure_flink_options_with_flink_version_overridden(self):
+ clusters = self.current_env.clusters
+ runner = interactive_runner.InteractiveRunner(
+ underlying_runner=FlinkRunner())
+ options = PipelineOptions(project='test-project', region='test-region')
+ flink_options = options.view_as(FlinkRunnerOptions)
+ flink_options.flink_version = 'test-version'
+ p = beam.Pipeline(runner=runner, options=options)
+ runner.configure_for_flink(p, options)
+
+ # The version is overridden to the flink version used by the EMR solution,
+ # currently only 1: Cloud Dataproc.
+ self.assertEqual(
+ flink_options.flink_version, clusters.DATAPROC_FLINK_VERSION)
+
if __name__ == '__main__':
unittest.main()