One thing I don't understand is what it means for "CLI or REST API
context [to be] present." Where does this context come from? A config
file in a standard location on the user's machine? Or is this
something that is only present when a user uploads a jar and then
Flink runs it in a specific context? Or both?

When you upload a Jar to Flink, it can be run by the Flink master. Running a Jar on the job manager will just invoke the main method. The same happens when you use the Flink CLI tool, the only difference being that the Jar runs on the user machine vs on the Flink master. In both cases, the Flink config file will be present in a standard location, i.e. "<flink_installation>/conf/flink_conf.yaml".

What Flink users typically do, is to call this method to construct a Flink dataflow (a.k.a. job):
  env = ExecutionEnvironment.getExecutionEnvironment()
  env.addSource().flatMap()
  env.execute()
This registers an environment which holds the Flink dataflow definition.

The Flink Runner also does this when flink_master is set to "[auto]". When it is set to "[local]", it will attempt to execute the whole pipeline locally (LocalEnvironment). When an address has been specified it will submit against a remote Flink cluster (RemoteEnvironment). The last two use cases do not make any sense for the user when they use the Python FlinkRunner which uses the jar upload feature of Flink. That's why "[auto]" is our choice for the FlinkUberJarJobServer feature of the Python FlinkRunner.

In the case of the FlinkJarJobServer to use "[auto]" can also make sense because you can even specify a Flink config directory for the Flink job server to use. Without a config, auto will always fall back to local execution.

One more question: https://issues.apache.org/jira/browse/BEAM-8396
still seems valuable, but with [auto] as the default, how should we
detect whether LOOPBACK is safe to enable from Python?

Yes, it is valuable. I suspect we only want to enable it for local execution? We could let the actual Runner handle this by falling back to the default environment in case it detects that the execution will not be local. It can simply tell Python then to shutdown the loopback server, or it shuts itself down after a timeout. Another option would be, to only support it when the mode is set to "[local]".

-Max

On 30.10.19 21:05, Robert Bradshaw wrote:
One more question: https://issues.apache.org/jira/browse/BEAM-8396
still seems valuable, but with [auto] as the default, how should we
detect whether LOOPBACK is safe to enable from Python?

On Wed, Oct 30, 2019 at 11:53 AM Robert Bradshaw <rober...@google.com> wrote:

Sounds good to me.

One thing I don't understand is what it means for "CLI or REST API
context [to be] present." Where does this context come from? A config
file in a standard location on the user's machine? Or is this
something that is only present when a user uploads a jar and then
Flink runs it in a specific context? Or both?

On Tue, Oct 29, 2019 at 3:27 AM Maximilian Michels <m...@apache.org> wrote:

tl;dr:

- I see consensus for inferring "http://"; in Python to align it with the
Java behavior which currently requires leaving out the protocol scheme.
Optionally, Java could also accept a scheme which gets removed as
required by the Flink Java Rest client.

- We won't support "https://"; in Python for now, because it requires
additional SSL setup, i.e. parsing the Flink config file and setting up
the truststore

- We want to keep "[auto]"/"[local]" but fix the current broken behavior
via https://issues.apache.org/jira/browse/BEAM-8507


Additional comments below:

One concern with this is that just supplying host:port is the existing
behavior, so we can't start requiring the http://

I wouldn't require it but optionally support it, otherwise add it
automatically.

One question I have is if there are other
authentication parameters that may be required for speaking to a flink
endpoint that we should be aware of (that would normally be buried in
the config file).

Yes, essentially all the SSL configuration is in the config file,
including the location of the truststore, the password, certificates, etc.

For now, I would say we cannot properly support SSL in Python, unless we
find a way to load the truststore from Python.

I do like being explicit with something like [local] rather than
treating the empty string in a magical way.

Fine, we can keep "[local]" and throw an error in case the address is
empty. Let's also throw an error in case the Flink CLI tool is used with
local execution because that is clearly not what the user wants.

I'd like to see this issue resolved before 2.17 as changing the public API once 
it's released will be harder.

+1. In particular, I misunderstood that [auto] is not supported by 
`FlinkUberJarJobServer`. Since [auto] is now the default, it's broken for 
Python 3.6+.

+1 Let's fix this in time for the release.

The user shouldn't have to specify a protocol for Python, I think it's 
preferable and reasonable to handle that for them in order to maintain existing 
behavior and align with Java SDK.

+1

Looks like we also have a [collection] configuration value [1].

Yeah, I think it is acceptable to remove this entirely. This has never
been used by anyone and is an unmaintained Flink feature.

I would find it acceptable to interpret absence of the option as "[auto]", 
which really means use CLI or REST API context when present, or local. I would prefer to 
not have an empty string value default (but rather None/null) and no additional magic 
values.

Let's keep "[auto]" then to keep it explicit. An empty string should
throw an error.


One more reason that was not part of this discussion yet.

@Jan: Supporting context classloaders in local mode is a new feature and
for keeping it simple, I'd start a new thread for it.


On 29.10.19 10:55, Jan Lukavský wrote:
Hi,

+1 for empty string being interpreted as [auto] and anything else having
explicit notation.

One more reason that was not part of this discussion yet. In [1] there
was a discussion about LocalEnvironment (that is the one that is
responsible for spawning in process Flink cluster) not using context
classloader and thus can fail loading some user code (if this code was
added to context classloader *after* application has been run).
LocalEnvironment on the other hand supposes that all classes can be
loaded by applicaiton's classloader and doesn't accept any "client
jars". Therefore - when application generates classes dynamically during
runtime it is currently impossible to run those using local flink
runner. There is a nasty hack for JDK <= 8 (injecting URL into
applications URLClassLoader), but that fails hard on JDK >= 9 (obviously).

The conclusion from that thread is that it could be solved by manually
running MiniCluster (which will run on localhost:8081 by default) and
then use this REST address for RemoteEnvironment so that the application
would be actually submitted as if it would be run on remote cluster and
therefore a dynamically generated JAR can be attached to it.

That would mean, that we can actually have two "local" modes - one using
LocalEnvironment and one with manual MiniCluster + RemoteEnvironment (if
for whatever reason we would like to keep both mode of local operation).
That could mean two masters - e.g. [local] and [local-over-remote] or
something like that.

Jan

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ClassLoader-created-by-BlobLibraryCacheManager-is-not-using-context-classloader-tp32434.html

On 10/29/19 5:50 AM, Thomas Weise wrote:
The current semantics of flink_master are tied to the Flink Java API.
The Flink client / Java API isn't a "REST API". It now uses the REST
API somewhere deep in RemoteEnvironment when the flink_master value is
host:port, but it does a lot of other things as well, such are parsing
config files and running local clusters.

A rest client to me is a thin wrapper around
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html


Different ways used in Beam to submit jobs to Flink:

- classic Flink runner, using Flink Java client
- job server, using Flink Java client
- generated jar, using Flink Java client
- Flink CLI with generated jar, using Flink Java client
- Python "FlinkRunner" using *REST API*

Since the last item does not submit through the Flink Java client,
there is a problem with the flink_master pipeline option. Though we
cannot change the option in a way that breaks existing setups. Like
Robert suggests, we could allow for optional URL syntax so that it can
be used with the REST API, in which case the code path that goes
through the Java client will have to disassemble such URL before
handing it to Flink.

I would find it acceptable to interpret absence of the option as
"[auto]", which really means use CLI or REST API context when present,
or local. I would prefer to not have an empty string value default
(but rather None/null) and no additional magic values.

Thomas


On Mon, Oct 28, 2019 at 4:16 PM Kyle Weaver <kcwea...@google.com
<mailto:kcwea...@google.com>> wrote:

     Filed https://issues.apache.org/jira/browse/BEAM-8507 for the
     issue I mentioned.

     On Mon, Oct 28, 2019 at 4:12 PM Kyle Weaver <kcwea...@google.com
     <mailto:kcwea...@google.com>> wrote:

         > I'd like to see this issue resolved before 2.17 as changing
         the public API once it's released will be harder.

         +1. In particular, I misunderstood that [auto] is not
         supported by `FlinkUberJarJobServer`. Since [auto] is now the
         default, it's broken for Python 3.6+.

         requests.exceptions.InvalidURL: Failed to parse: [auto]/v1/config

         We definitely should fix that, if nothing else.

         > One concern with this is that just supplying host:port is
         the existing
         > behavior, so we can't start requiring the http://.

         The user shouldn't have to specify a protocol for Python, I
         think it's preferable and reasonable to handle that for them
         in order to maintain existing behavior and align with Java SDK.

         > 2. Deprecate the "[auto]" and "[local]" values. It should be
         sufficient
         > to have either a non-empty address string or an empty one.
         The empty
         > string would either mean local execution or, in the context
         of the Flink
         > CLI tool, loading the master address from the config. The
         non-empty
         > string would be interpreted as a cluster address.

         Looks like we also have a [collection] configuration value [1].

         If we're using [auto] as the default, I don't think this
         really makes so much of a difference (as long as we're
         supporting and documenting these properly, of course). I'm not
         sure there's a compelling reason to change this?

         > always run locally (the least surprising to me

         I agree a local cluster should remain the default, whether
         that is achieved through [local] or [auto] or some new
         mechanism such as the above.

         [1]
         
https://github.com/apache/beam/blob/2f1b56ccc506054e40afe4793a8b556e872e1865/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L80

         On Mon, Oct 28, 2019 at 6:35 AM Maximilian Michels
         <m...@apache.org <mailto:m...@apache.org>> wrote:

             Hi,

             Robert and Kyle have been doing great work to simplify
             submitting
             portable pipelines with the Flink Runner. Part of this is
             having a
             Python "FlinkRunner" which handles bringing up a Beam job
             server and
             submitting the pipeline directly via the Flink REST API.
             One building
             block is the creation of "executable Jars" which contain the
             materialized / translated Flink pipeline and do not
             require the Beam job
             server or the Python driver anymore.

             While unifying a newly introduced option
             "flink_master_url" with the
             pre-existing "flink_master" [1][2], some questions came up
             about Flink's
             execution modes. (The two options are meant to do the same
             thing:
             provide the address of the Flink master to hand-over the
             translated
             pipeline.)

             Historically, Flink had a proprietary protocol for
             submitting pipelines,
             running on port 9091. This has since been replaced with a
             REST protocol
             at port 8081. To this date, this has implications how you
             submit
             programs, e.g. the Flink client libraries expects the
             address to be of
             form "host:port", without a protocol scheme. On the other
             hand, external
             Rest libraries typically expect a protocol scheme.

             But this is only half of the fun. There are also special
             addresses for
             "flink_master" that influence submission of the pipeline.
             If you specify
             "[local]" as the address, the pipeline won't be submitted
             but executed
             in a local in-process Flink cluster. If you specify
             "[auto]" and you use
             the CLI tool that comes bundled with Flink, then the
             master address will
             be loaded from the Flink config, including any
             configuration like SSL.
             If none is found, then it falls back to "[local]".

             This is a bit odd, and after a discussion with Robert and
             Thomas in [1],
             we figured that this needs to be changed:

             1. Make the master address a URL. Add "http://"; to
             "flink_master" in
             Python if no scheme is specified. Similarly, remove any
             "http://"; in
             Java, since the Java rest client does not expect a scheme.
             In case of
             "http_s_://", we have a special treatment to load the SSL
             settings from
             the Flink config.

             2. Deprecate the "[auto]" and "[local]" values. It should
             be sufficient
             to have either a non-empty address string or an empty one.
             The empty
             string would either mean local execution or, in the
             context of the Flink
             CLI tool, loading the master address from the config. The
             non-empty
             string would be interpreted as a cluster address.


             Any opinions on this?


             Thanks,
             Max


             [1] https://github.com/apache/beam/pull/9803
             [2] https://github.com/apache/beam/pull/9844

Reply via email to