Hi,
Robert and Kyle have been doing great work to simplify submitting
portable pipelines with the Flink Runner. Part of this is having a
Python "FlinkRunner" which handles bringing up a Beam job server and
submitting the pipeline directly via the Flink REST API. One building
block is the creation of "executable Jars" which contain the
materialized / translated Flink pipeline and do not require the Beam job
server or the Python driver anymore.
While unifying a newly introduced option "flink_master_url" with the
pre-existing "flink_master" [1][2], some questions came up about Flink's
execution modes. (The two options are meant to do the same thing:
provide the address of the Flink master to hand-over the translated
pipeline.)
Historically, Flink had a proprietary protocol for submitting pipelines,
running on port 9091. This has since been replaced with a REST protocol
at port 8081. To this date, this has implications how you submit
programs, e.g. the Flink client libraries expects the address to be of
form "host:port", without a protocol scheme. On the other hand, external
Rest libraries typically expect a protocol scheme.
But this is only half of the fun. There are also special addresses for
"flink_master" that influence submission of the pipeline. If you specify
"[local]" as the address, the pipeline won't be submitted but executed
in a local in-process Flink cluster. If you specify "[auto]" and you use
the CLI tool that comes bundled with Flink, then the master address will
be loaded from the Flink config, including any configuration like SSL.
If none is found, then it falls back to "[local]".
This is a bit odd, and after a discussion with Robert and Thomas in [1],
we figured that this needs to be changed:
1. Make the master address a URL. Add "http://" to "flink_master" in
Python if no scheme is specified. Similarly, remove any "http://" in
Java, since the Java rest client does not expect a scheme. In case of
"http_s_://", we have a special treatment to load the SSL settings from
the Flink config.
2. Deprecate the "[auto]" and "[local]" values. It should be sufficient
to have either a non-empty address string or an empty one. The empty
string would either mean local execution or, in the context of the Flink
CLI tool, loading the master address from the config. The non-empty
string would be interpreted as a cluster address.
Any opinions on this?
Thanks,
Max
[1] https://github.com/apache/beam/pull/9803
[2] https://github.com/apache/beam/pull/9844