Hi,

Robert and Kyle have been doing great work to simplify submitting portable pipelines with the Flink Runner. Part of this is having a Python "FlinkRunner" which handles bringing up a Beam job server and submitting the pipeline directly via the Flink REST API. One building block is the creation of "executable Jars" which contain the materialized / translated Flink pipeline and do not require the Beam job server or the Python driver anymore.

While unifying a newly introduced option "flink_master_url" with the pre-existing "flink_master" [1][2], some questions came up about Flink's execution modes. (The two options are meant to do the same thing: provide the address of the Flink master to hand-over the translated pipeline.)

Historically, Flink had a proprietary protocol for submitting pipelines, running on port 9091. This has since been replaced with a REST protocol at port 8081. To this date, this has implications how you submit programs, e.g. the Flink client libraries expects the address to be of form "host:port", without a protocol scheme. On the other hand, external Rest libraries typically expect a protocol scheme.

But this is only half of the fun. There are also special addresses for "flink_master" that influence submission of the pipeline. If you specify "[local]" as the address, the pipeline won't be submitted but executed in a local in-process Flink cluster. If you specify "[auto]" and you use the CLI tool that comes bundled with Flink, then the master address will be loaded from the Flink config, including any configuration like SSL. If none is found, then it falls back to "[local]".

This is a bit odd, and after a discussion with Robert and Thomas in [1], we figured that this needs to be changed:

1. Make the master address a URL. Add "http://"; to "flink_master" in Python if no scheme is specified. Similarly, remove any "http://"; in Java, since the Java rest client does not expect a scheme. In case of "http_s_://", we have a special treatment to load the SSL settings from the Flink config.

2. Deprecate the "[auto]" and "[local]" values. It should be sufficient to have either a non-empty address string or an empty one. The empty string would either mean local execution or, in the context of the Flink CLI tool, loading the master address from the config. The non-empty string would be interpreted as a cluster address.


Any opinions on this?


Thanks,
Max


[1] https://github.com/apache/beam/pull/9803
[2] https://github.com/apache/beam/pull/9844

Reply via email to