Sounds good to me. One thing I don't understand is what it means for "CLI or REST API context [to be] present." Where does this context come from? A config file in a standard location on the user's machine? Or is this something that is only present when a user uploads a jar and then Flink runs it in a specific context? Or both?
On Tue, Oct 29, 2019 at 3:27 AM Maximilian Michels <m...@apache.org> wrote: > > tl;dr: > > - I see consensus for inferring "http://" in Python to align it with the > Java behavior which currently requires leaving out the protocol scheme. > Optionally, Java could also accept a scheme which gets removed as > required by the Flink Java Rest client. > > - We won't support "https://" in Python for now, because it requires > additional SSL setup, i.e. parsing the Flink config file and setting up > the truststore > > - We want to keep "[auto]"/"[local]" but fix the current broken behavior > via https://issues.apache.org/jira/browse/BEAM-8507 > > > Additional comments below: > > > One concern with this is that just supplying host:port is the existing > > behavior, so we can't start requiring the http:// > > I wouldn't require it but optionally support it, otherwise add it > automatically. > > > One question I have is if there are other > > authentication parameters that may be required for speaking to a flink > > endpoint that we should be aware of (that would normally be buried in > > the config file). > > Yes, essentially all the SSL configuration is in the config file, > including the location of the truststore, the password, certificates, etc. > > For now, I would say we cannot properly support SSL in Python, unless we > find a way to load the truststore from Python. > > > I do like being explicit with something like [local] rather than > > treating the empty string in a magical way. > > Fine, we can keep "[local]" and throw an error in case the address is > empty. Let's also throw an error in case the Flink CLI tool is used with > local execution because that is clearly not what the user wants. > > >> I'd like to see this issue resolved before 2.17 as changing the public API > >> once it's released will be harder. > > > > +1. In particular, I misunderstood that [auto] is not supported by > > `FlinkUberJarJobServer`. Since [auto] is now the default, it's broken for > > Python 3.6+. > > +1 Let's fix this in time for the release. > > > The user shouldn't have to specify a protocol for Python, I think it's > > preferable and reasonable to handle that for them in order to maintain > > existing behavior and align with Java SDK. > > +1 > > > Looks like we also have a [collection] configuration value [1]. > > Yeah, I think it is acceptable to remove this entirely. This has never > been used by anyone and is an unmaintained Flink feature. > > > I would find it acceptable to interpret absence of the option as "[auto]", > > which really means use CLI or REST API context when present, or local. I > > would prefer to not have an empty string value default (but rather > > None/null) and no additional magic values. > > Let's keep "[auto]" then to keep it explicit. An empty string should > throw an error. > > > > One more reason that was not part of this discussion yet. > > @Jan: Supporting context classloaders in local mode is a new feature and > for keeping it simple, I'd start a new thread for it. > > > On 29.10.19 10:55, Jan Lukavský wrote: > > Hi, > > > > +1 for empty string being interpreted as [auto] and anything else having > > explicit notation. > > > > One more reason that was not part of this discussion yet. In [1] there > > was a discussion about LocalEnvironment (that is the one that is > > responsible for spawning in process Flink cluster) not using context > > classloader and thus can fail loading some user code (if this code was > > added to context classloader *after* application has been run). > > LocalEnvironment on the other hand supposes that all classes can be > > loaded by applicaiton's classloader and doesn't accept any "client > > jars". Therefore - when application generates classes dynamically during > > runtime it is currently impossible to run those using local flink > > runner. There is a nasty hack for JDK <= 8 (injecting URL into > > applications URLClassLoader), but that fails hard on JDK >= 9 (obviously). > > > > The conclusion from that thread is that it could be solved by manually > > running MiniCluster (which will run on localhost:8081 by default) and > > then use this REST address for RemoteEnvironment so that the application > > would be actually submitted as if it would be run on remote cluster and > > therefore a dynamically generated JAR can be attached to it. > > > > That would mean, that we can actually have two "local" modes - one using > > LocalEnvironment and one with manual MiniCluster + RemoteEnvironment (if > > for whatever reason we would like to keep both mode of local operation). > > That could mean two masters - e.g. [local] and [local-over-remote] or > > something like that. > > > > Jan > > > > [1] > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ClassLoader-created-by-BlobLibraryCacheManager-is-not-using-context-classloader-tp32434.html > > > > On 10/29/19 5:50 AM, Thomas Weise wrote: > >> The current semantics of flink_master are tied to the Flink Java API. > >> The Flink client / Java API isn't a "REST API". It now uses the REST > >> API somewhere deep in RemoteEnvironment when the flink_master value is > >> host:port, but it does a lot of other things as well, such are parsing > >> config files and running local clusters. > >> > >> A rest client to me is a thin wrapper around > >> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html > >> > >> > >> Different ways used in Beam to submit jobs to Flink: > >> > >> - classic Flink runner, using Flink Java client > >> - job server, using Flink Java client > >> - generated jar, using Flink Java client > >> - Flink CLI with generated jar, using Flink Java client > >> - Python "FlinkRunner" using *REST API* > >> > >> Since the last item does not submit through the Flink Java client, > >> there is a problem with the flink_master pipeline option. Though we > >> cannot change the option in a way that breaks existing setups. Like > >> Robert suggests, we could allow for optional URL syntax so that it can > >> be used with the REST API, in which case the code path that goes > >> through the Java client will have to disassemble such URL before > >> handing it to Flink. > >> > >> I would find it acceptable to interpret absence of the option as > >> "[auto]", which really means use CLI or REST API context when present, > >> or local. I would prefer to not have an empty string value default > >> (but rather None/null) and no additional magic values. > >> > >> Thomas > >> > >> > >> On Mon, Oct 28, 2019 at 4:16 PM Kyle Weaver <kcwea...@google.com > >> <mailto:kcwea...@google.com>> wrote: > >> > >> Filed https://issues.apache.org/jira/browse/BEAM-8507 for the > >> issue I mentioned. > >> > >> On Mon, Oct 28, 2019 at 4:12 PM Kyle Weaver <kcwea...@google.com > >> <mailto:kcwea...@google.com>> wrote: > >> > >> > I'd like to see this issue resolved before 2.17 as changing > >> the public API once it's released will be harder. > >> > >> +1. In particular, I misunderstood that [auto] is not > >> supported by `FlinkUberJarJobServer`. Since [auto] is now the > >> default, it's broken for Python 3.6+. > >> > >> requests.exceptions.InvalidURL: Failed to parse: [auto]/v1/config > >> > >> We definitely should fix that, if nothing else. > >> > >> > One concern with this is that just supplying host:port is > >> the existing > >> > behavior, so we can't start requiring the http://. > >> > >> The user shouldn't have to specify a protocol for Python, I > >> think it's preferable and reasonable to handle that for them > >> in order to maintain existing behavior and align with Java SDK. > >> > >> > 2. Deprecate the "[auto]" and "[local]" values. It should be > >> sufficient > >> > to have either a non-empty address string or an empty one. > >> The empty > >> > string would either mean local execution or, in the context > >> of the Flink > >> > CLI tool, loading the master address from the config. The > >> non-empty > >> > string would be interpreted as a cluster address. > >> > >> Looks like we also have a [collection] configuration value [1]. > >> > >> If we're using [auto] as the default, I don't think this > >> really makes so much of a difference (as long as we're > >> supporting and documenting these properly, of course). I'm not > >> sure there's a compelling reason to change this? > >> > >> > always run locally (the least surprising to me > >> > >> I agree a local cluster should remain the default, whether > >> that is achieved through [local] or [auto] or some new > >> mechanism such as the above. > >> > >> [1] > >> > >> https://github.com/apache/beam/blob/2f1b56ccc506054e40afe4793a8b556e872e1865/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L80 > >> > >> On Mon, Oct 28, 2019 at 6:35 AM Maximilian Michels > >> <m...@apache.org <mailto:m...@apache.org>> wrote: > >> > >> Hi, > >> > >> Robert and Kyle have been doing great work to simplify > >> submitting > >> portable pipelines with the Flink Runner. Part of this is > >> having a > >> Python "FlinkRunner" which handles bringing up a Beam job > >> server and > >> submitting the pipeline directly via the Flink REST API. > >> One building > >> block is the creation of "executable Jars" which contain the > >> materialized / translated Flink pipeline and do not > >> require the Beam job > >> server or the Python driver anymore. > >> > >> While unifying a newly introduced option > >> "flink_master_url" with the > >> pre-existing "flink_master" [1][2], some questions came up > >> about Flink's > >> execution modes. (The two options are meant to do the same > >> thing: > >> provide the address of the Flink master to hand-over the > >> translated > >> pipeline.) > >> > >> Historically, Flink had a proprietary protocol for > >> submitting pipelines, > >> running on port 9091. This has since been replaced with a > >> REST protocol > >> at port 8081. To this date, this has implications how you > >> submit > >> programs, e.g. the Flink client libraries expects the > >> address to be of > >> form "host:port", without a protocol scheme. On the other > >> hand, external > >> Rest libraries typically expect a protocol scheme. > >> > >> But this is only half of the fun. There are also special > >> addresses for > >> "flink_master" that influence submission of the pipeline. > >> If you specify > >> "[local]" as the address, the pipeline won't be submitted > >> but executed > >> in a local in-process Flink cluster. If you specify > >> "[auto]" and you use > >> the CLI tool that comes bundled with Flink, then the > >> master address will > >> be loaded from the Flink config, including any > >> configuration like SSL. > >> If none is found, then it falls back to "[local]". > >> > >> This is a bit odd, and after a discussion with Robert and > >> Thomas in [1], > >> we figured that this needs to be changed: > >> > >> 1. Make the master address a URL. Add "http://" to > >> "flink_master" in > >> Python if no scheme is specified. Similarly, remove any > >> "http://" in > >> Java, since the Java rest client does not expect a scheme. > >> In case of > >> "http_s_://", we have a special treatment to load the SSL > >> settings from > >> the Flink config. > >> > >> 2. Deprecate the "[auto]" and "[local]" values. It should > >> be sufficient > >> to have either a non-empty address string or an empty one. > >> The empty > >> string would either mean local execution or, in the > >> context of the Flink > >> CLI tool, loading the master address from the config. The > >> non-empty > >> string would be interpreted as a cluster address. > >> > >> > >> Any opinions on this? > >> > >> > >> Thanks, > >> Max > >> > >> > >> [1] https://github.com/apache/beam/pull/9803 > >> [2] https://github.com/apache/beam/pull/9844 > >>