The current semantics of flink_master are tied to the Flink Java API. The Flink client / Java API isn't a "REST API". It now uses the REST API somewhere deep in RemoteEnvironment when the flink_master value is host:port, but it does a lot of other things as well, such are parsing config files and running local clusters.
A rest client to me is a thin wrapper around https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html Different ways used in Beam to submit jobs to Flink: - classic Flink runner, using Flink Java client - job server, using Flink Java client - generated jar, using Flink Java client - Flink CLI with generated jar, using Flink Java client - Python "FlinkRunner" using *REST API* Since the last item does not submit through the Flink Java client, there is a problem with the flink_master pipeline option. Though we cannot change the option in a way that breaks existing setups. Like Robert suggests, we could allow for optional URL syntax so that it can be used with the REST API, in which case the code path that goes through the Java client will have to disassemble such URL before handing it to Flink. I would find it acceptable to interpret absence of the option as "[auto]", which really means use CLI or REST API context when present, or local. I would prefer to not have an empty string value default (but rather None/null) and no additional magic values. Thomas On Mon, Oct 28, 2019 at 4:16 PM Kyle Weaver <kcwea...@google.com> wrote: > Filed https://issues.apache.org/jira/browse/BEAM-8507 for the issue I > mentioned. > > On Mon, Oct 28, 2019 at 4:12 PM Kyle Weaver <kcwea...@google.com> wrote: > >> > I'd like to see this issue resolved before 2.17 as changing the public >> API once it's released will be harder. >> >> +1. In particular, I misunderstood that [auto] is not supported by >> `FlinkUberJarJobServer`. Since [auto] is now the default, it's broken for >> Python 3.6+. >> >> requests.exceptions.InvalidURL: Failed to parse: [auto]/v1/config >> >> We definitely should fix that, if nothing else. >> >> > One concern with this is that just supplying host:port is the existing >> > behavior, so we can't start requiring the http://. >> >> The user shouldn't have to specify a protocol for Python, I think it's >> preferable and reasonable to handle that for them in order to maintain >> existing behavior and align with Java SDK. >> >> > 2. Deprecate the "[auto]" and "[local]" values. It should be sufficient >> > to have either a non-empty address string or an empty one. The empty >> > string would either mean local execution or, in the context of the Flink >> > CLI tool, loading the master address from the config. The non-empty >> > string would be interpreted as a cluster address. >> >> Looks like we also have a [collection] configuration value [1]. >> >> If we're using [auto] as the default, I don't think this really makes so >> much of a difference (as long as we're supporting and documenting these >> properly, of course). I'm not sure there's a compelling reason to change >> this? >> >> > always run locally (the least surprising to me >> >> I agree a local cluster should remain the default, whether that is >> achieved through [local] or [auto] or some new mechanism such as the above. >> >> [1] >> https://github.com/apache/beam/blob/2f1b56ccc506054e40afe4793a8b556e872e1865/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L80 >> >> On Mon, Oct 28, 2019 at 6:35 AM Maximilian Michels <m...@apache.org> >> wrote: >> >>> Hi, >>> >>> Robert and Kyle have been doing great work to simplify submitting >>> portable pipelines with the Flink Runner. Part of this is having a >>> Python "FlinkRunner" which handles bringing up a Beam job server and >>> submitting the pipeline directly via the Flink REST API. One building >>> block is the creation of "executable Jars" which contain the >>> materialized / translated Flink pipeline and do not require the Beam job >>> server or the Python driver anymore. >>> >>> While unifying a newly introduced option "flink_master_url" with the >>> pre-existing "flink_master" [1][2], some questions came up about Flink's >>> execution modes. (The two options are meant to do the same thing: >>> provide the address of the Flink master to hand-over the translated >>> pipeline.) >>> >>> Historically, Flink had a proprietary protocol for submitting pipelines, >>> running on port 9091. This has since been replaced with a REST protocol >>> at port 8081. To this date, this has implications how you submit >>> programs, e.g. the Flink client libraries expects the address to be of >>> form "host:port", without a protocol scheme. On the other hand, external >>> Rest libraries typically expect a protocol scheme. >>> >>> But this is only half of the fun. There are also special addresses for >>> "flink_master" that influence submission of the pipeline. If you specify >>> "[local]" as the address, the pipeline won't be submitted but executed >>> in a local in-process Flink cluster. If you specify "[auto]" and you use >>> the CLI tool that comes bundled with Flink, then the master address will >>> be loaded from the Flink config, including any configuration like SSL. >>> If none is found, then it falls back to "[local]". >>> >>> This is a bit odd, and after a discussion with Robert and Thomas in [1], >>> we figured that this needs to be changed: >>> >>> 1. Make the master address a URL. Add "http://" to "flink_master" in >>> Python if no scheme is specified. Similarly, remove any "http://" in >>> Java, since the Java rest client does not expect a scheme. In case of >>> "http_s_://", we have a special treatment to load the SSL settings from >>> the Flink config. >>> >>> 2. Deprecate the "[auto]" and "[local]" values. It should be sufficient >>> to have either a non-empty address string or an empty one. The empty >>> string would either mean local execution or, in the context of the Flink >>> CLI tool, loading the master address from the config. The non-empty >>> string would be interpreted as a cluster address. >>> >>> >>> Any opinions on this? >>> >>> >>> Thanks, >>> Max >>> >>> >>> [1] https://github.com/apache/beam/pull/9803 >>> [2] https://github.com/apache/beam/pull/9844 >>> >>