This is an automated email from the ASF dual-hosted git repository.
mikhail pushed a commit to branch release-2.17.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.17.0 by this push:
new 9f32479 [BEAM-8835] Disable Flink Uber Jar by default. (#10270)
new 7148e2e Merge pull request #10274 from robertwb/release-2.17.0
9f32479 is described below
commit 9f32479944d36a41e73f40f72fce2c9ef536439b
Author: Robert Bradshaw <[email protected]>
AuthorDate: Tue Dec 3 13:50:58 2019 -0800
[BEAM-8835] Disable Flink Uber Jar by default. (#10270)
---
.../apache_beam/runners/portability/flink_runner.py | 18 +++++++++++++++---
1 file changed, 15 insertions(+), 3 deletions(-)
diff --git a/sdks/python/apache_beam/runners/portability/flink_runner.py
b/sdks/python/apache_beam/runners/portability/flink_runner.py
index 296c7bc..c438dfc 100644
--- a/sdks/python/apache_beam/runners/portability/flink_runner.py
+++ b/sdks/python/apache_beam/runners/portability/flink_runner.py
@@ -46,14 +46,19 @@ class FlinkRunner(portable_runner.PortableRunner):
flink_master = self.add_http_scheme(
options.view_as(FlinkRunnerOptions).flink_master)
options.view_as(FlinkRunnerOptions).flink_master = flink_master
- if flink_master in MAGIC_HOST_NAMES or sys.version_info < (3, 6):
- return job_server.StopOnExitJobServer(FlinkJarJobServer(options))
- else:
+ if (options.view_as(FlinkRunnerOptions).flink_submit_uber_jar
+ and flink_master not in MAGIC_HOST_NAMES):
+ if sys.version_info < (3, 6):
+ raise ValueError(
+ 'flink_submit_uber_jar requires Python 3.6+, current version %s'
+ % sys.version)
# This has to be changed [auto], otherwise we will attempt to submit a
# the pipeline remotely on the Flink JobMaster which will _fail_.
# DO NOT CHANGE the following line, unless you have tested this.
options.view_as(FlinkRunnerOptions).flink_master = '[auto]'
return flink_uber_jar_job_server.FlinkUberJarJobServer(flink_master)
+ else:
+ return job_server.StopOnExitJobServer(FlinkJarJobServer(options))
@staticmethod
def add_http_scheme(flink_master):
@@ -84,6 +89,13 @@ class FlinkRunnerOptions(pipeline_options.PipelineOptions):
parser.add_argument('--flink_job_server_jar',
help='Path or URL to a flink jobserver jar.')
parser.add_argument('--artifacts_dir', default=None)
+ parser.add_argument('--flink_submit_uber_jar',
+ default=False,
+ action='store_true',
+ help='Create and upload an uberjar to the flink master'
+ ' directly, rather than starting up a job server.'
+ ' Only applies when flink_master is set to a'
+ ' cluster address. Requires Python 3.6+.')
class FlinkJarJobServer(job_server.JavaJarJobServer):