Sounds good to me.

One thing I don't understand is what it means for "CLI or REST API
context [to be] present." Where does this context come from? A config
file in a standard location on the user's machine? Or is this
something that is only present when a user uploads a jar and then
Flink runs it in a specific context? Or both?

On Tue, Oct 29, 2019 at 3:27 AM Maximilian Michels <m...@apache.org> wrote:
>
> tl;dr:
>
> - I see consensus for inferring "http://"; in Python to align it with the
> Java behavior which currently requires leaving out the protocol scheme.
> Optionally, Java could also accept a scheme which gets removed as
> required by the Flink Java Rest client.
>
> - We won't support "https://"; in Python for now, because it requires
> additional SSL setup, i.e. parsing the Flink config file and setting up
> the truststore
>
> - We want to keep "[auto]"/"[local]" but fix the current broken behavior
> via https://issues.apache.org/jira/browse/BEAM-8507
>
>
> Additional comments below:
>
> > One concern with this is that just supplying host:port is the existing
> > behavior, so we can't start requiring the http://
>
> I wouldn't require it but optionally support it, otherwise add it
> automatically.
>
> > One question I have is if there are other
> > authentication parameters that may be required for speaking to a flink
> > endpoint that we should be aware of (that would normally be buried in
> > the config file).
>
> Yes, essentially all the SSL configuration is in the config file,
> including the location of the truststore, the password, certificates, etc.
>
> For now, I would say we cannot properly support SSL in Python, unless we
> find a way to load the truststore from Python.
>
> > I do like being explicit with something like [local] rather than
> > treating the empty string in a magical way.
>
> Fine, we can keep "[local]" and throw an error in case the address is
> empty. Let's also throw an error in case the Flink CLI tool is used with
> local execution because that is clearly not what the user wants.
>
> >> I'd like to see this issue resolved before 2.17 as changing the public API 
> >> once it's released will be harder.
> >
> > +1. In particular, I misunderstood that [auto] is not supported by 
> > `FlinkUberJarJobServer`. Since [auto] is now the default, it's broken for 
> > Python 3.6+.
>
> +1 Let's fix this in time for the release.
>
> > The user shouldn't have to specify a protocol for Python, I think it's 
> > preferable and reasonable to handle that for them in order to maintain 
> > existing behavior and align with Java SDK.
>
> +1
>
> > Looks like we also have a [collection] configuration value [1].
>
> Yeah, I think it is acceptable to remove this entirely. This has never
> been used by anyone and is an unmaintained Flink feature.
>
> > I would find it acceptable to interpret absence of the option as "[auto]", 
> > which really means use CLI or REST API context when present, or local. I 
> > would prefer to not have an empty string value default (but rather 
> > None/null) and no additional magic values.
>
> Let's keep "[auto]" then to keep it explicit. An empty string should
> throw an error.
>
>
> > One more reason that was not part of this discussion yet.
>
> @Jan: Supporting context classloaders in local mode is a new feature and
> for keeping it simple, I'd start a new thread for it.
>
>
> On 29.10.19 10:55, Jan Lukavský wrote:
> > Hi,
> >
> > +1 for empty string being interpreted as [auto] and anything else having
> > explicit notation.
> >
> > One more reason that was not part of this discussion yet. In [1] there
> > was a discussion about LocalEnvironment (that is the one that is
> > responsible for spawning in process Flink cluster) not using context
> > classloader and thus can fail loading some user code (if this code was
> > added to context classloader *after* application has been run).
> > LocalEnvironment on the other hand supposes that all classes can be
> > loaded by applicaiton's classloader and doesn't accept any "client
> > jars". Therefore - when application generates classes dynamically during
> > runtime it is currently impossible to run those using local flink
> > runner. There is a nasty hack for JDK <= 8 (injecting URL into
> > applications URLClassLoader), but that fails hard on JDK >= 9 (obviously).
> >
> > The conclusion from that thread is that it could be solved by manually
> > running MiniCluster (which will run on localhost:8081 by default) and
> > then use this REST address for RemoteEnvironment so that the application
> > would be actually submitted as if it would be run on remote cluster and
> > therefore a dynamically generated JAR can be attached to it.
> >
> > That would mean, that we can actually have two "local" modes - one using
> > LocalEnvironment and one with manual MiniCluster + RemoteEnvironment (if
> > for whatever reason we would like to keep both mode of local operation).
> > That could mean two masters - e.g. [local] and [local-over-remote] or
> > something like that.
> >
> > Jan
> >
> > [1]
> > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ClassLoader-created-by-BlobLibraryCacheManager-is-not-using-context-classloader-tp32434.html
> >
> > On 10/29/19 5:50 AM, Thomas Weise wrote:
> >> The current semantics of flink_master are tied to the Flink Java API.
> >> The Flink client / Java API isn't a "REST API". It now uses the REST
> >> API somewhere deep in RemoteEnvironment when the flink_master value is
> >> host:port, but it does a lot of other things as well, such are parsing
> >> config files and running local clusters.
> >>
> >> A rest client to me is a thin wrapper around
> >> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html
> >>
> >>
> >> Different ways used in Beam to submit jobs to Flink:
> >>
> >> - classic Flink runner, using Flink Java client
> >> - job server, using Flink Java client
> >> - generated jar, using Flink Java client
> >> - Flink CLI with generated jar, using Flink Java client
> >> - Python "FlinkRunner" using *REST API*
> >>
> >> Since the last item does not submit through the Flink Java client,
> >> there is a problem with the flink_master pipeline option. Though we
> >> cannot change the option in a way that breaks existing setups. Like
> >> Robert suggests, we could allow for optional URL syntax so that it can
> >> be used with the REST API, in which case the code path that goes
> >> through the Java client will have to disassemble such URL before
> >> handing it to Flink.
> >>
> >> I would find it acceptable to interpret absence of the option as
> >> "[auto]", which really means use CLI or REST API context when present,
> >> or local. I would prefer to not have an empty string value default
> >> (but rather None/null) and no additional magic values.
> >>
> >> Thomas
> >>
> >>
> >> On Mon, Oct 28, 2019 at 4:16 PM Kyle Weaver <kcwea...@google.com
> >> <mailto:kcwea...@google.com>> wrote:
> >>
> >>     Filed https://issues.apache.org/jira/browse/BEAM-8507 for the
> >>     issue I mentioned.
> >>
> >>     On Mon, Oct 28, 2019 at 4:12 PM Kyle Weaver <kcwea...@google.com
> >>     <mailto:kcwea...@google.com>> wrote:
> >>
> >>         > I'd like to see this issue resolved before 2.17 as changing
> >>         the public API once it's released will be harder.
> >>
> >>         +1. In particular, I misunderstood that [auto] is not
> >>         supported by `FlinkUberJarJobServer`. Since [auto] is now the
> >>         default, it's broken for Python 3.6+.
> >>
> >>         requests.exceptions.InvalidURL: Failed to parse: [auto]/v1/config
> >>
> >>         We definitely should fix that, if nothing else.
> >>
> >>         > One concern with this is that just supplying host:port is
> >>         the existing
> >>         > behavior, so we can't start requiring the http://.
> >>
> >>         The user shouldn't have to specify a protocol for Python, I
> >>         think it's preferable and reasonable to handle that for them
> >>         in order to maintain existing behavior and align with Java SDK.
> >>
> >>         > 2. Deprecate the "[auto]" and "[local]" values. It should be
> >>         sufficient
> >>         > to have either a non-empty address string or an empty one.
> >>         The empty
> >>         > string would either mean local execution or, in the context
> >>         of the Flink
> >>         > CLI tool, loading the master address from the config. The
> >>         non-empty
> >>         > string would be interpreted as a cluster address.
> >>
> >>         Looks like we also have a [collection] configuration value [1].
> >>
> >>         If we're using [auto] as the default, I don't think this
> >>         really makes so much of a difference (as long as we're
> >>         supporting and documenting these properly, of course). I'm not
> >>         sure there's a compelling reason to change this?
> >>
> >>         > always run locally (the least surprising to me
> >>
> >>         I agree a local cluster should remain the default, whether
> >>         that is achieved through [local] or [auto] or some new
> >>         mechanism such as the above.
> >>
> >>         [1]
> >>         
> >> https://github.com/apache/beam/blob/2f1b56ccc506054e40afe4793a8b556e872e1865/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L80
> >>
> >>         On Mon, Oct 28, 2019 at 6:35 AM Maximilian Michels
> >>         <m...@apache.org <mailto:m...@apache.org>> wrote:
> >>
> >>             Hi,
> >>
> >>             Robert and Kyle have been doing great work to simplify
> >>             submitting
> >>             portable pipelines with the Flink Runner. Part of this is
> >>             having a
> >>             Python "FlinkRunner" which handles bringing up a Beam job
> >>             server and
> >>             submitting the pipeline directly via the Flink REST API.
> >>             One building
> >>             block is the creation of "executable Jars" which contain the
> >>             materialized / translated Flink pipeline and do not
> >>             require the Beam job
> >>             server or the Python driver anymore.
> >>
> >>             While unifying a newly introduced option
> >>             "flink_master_url" with the
> >>             pre-existing "flink_master" [1][2], some questions came up
> >>             about Flink's
> >>             execution modes. (The two options are meant to do the same
> >>             thing:
> >>             provide the address of the Flink master to hand-over the
> >>             translated
> >>             pipeline.)
> >>
> >>             Historically, Flink had a proprietary protocol for
> >>             submitting pipelines,
> >>             running on port 9091. This has since been replaced with a
> >>             REST protocol
> >>             at port 8081. To this date, this has implications how you
> >>             submit
> >>             programs, e.g. the Flink client libraries expects the
> >>             address to be of
> >>             form "host:port", without a protocol scheme. On the other
> >>             hand, external
> >>             Rest libraries typically expect a protocol scheme.
> >>
> >>             But this is only half of the fun. There are also special
> >>             addresses for
> >>             "flink_master" that influence submission of the pipeline.
> >>             If you specify
> >>             "[local]" as the address, the pipeline won't be submitted
> >>             but executed
> >>             in a local in-process Flink cluster. If you specify
> >>             "[auto]" and you use
> >>             the CLI tool that comes bundled with Flink, then the
> >>             master address will
> >>             be loaded from the Flink config, including any
> >>             configuration like SSL.
> >>             If none is found, then it falls back to "[local]".
> >>
> >>             This is a bit odd, and after a discussion with Robert and
> >>             Thomas in [1],
> >>             we figured that this needs to be changed:
> >>
> >>             1. Make the master address a URL. Add "http://"; to
> >>             "flink_master" in
> >>             Python if no scheme is specified. Similarly, remove any
> >>             "http://"; in
> >>             Java, since the Java rest client does not expect a scheme.
> >>             In case of
> >>             "http_s_://", we have a special treatment to load the SSL
> >>             settings from
> >>             the Flink config.
> >>
> >>             2. Deprecate the "[auto]" and "[local]" values. It should
> >>             be sufficient
> >>             to have either a non-empty address string or an empty one.
> >>             The empty
> >>             string would either mean local execution or, in the
> >>             context of the Flink
> >>             CLI tool, loading the master address from the config. The
> >>             non-empty
> >>             string would be interpreted as a cluster address.
> >>
> >>
> >>             Any opinions on this?
> >>
> >>
> >>             Thanks,
> >>             Max
> >>
> >>
> >>             [1] https://github.com/apache/beam/pull/9803
> >>             [2] https://github.com/apache/beam/pull/9844
> >>

Reply via email to