On Wed, Oct 30, 2019 at 3:34 PM Maximilian Michels <m...@apache.org> wrote:
>
> > One thing I don't understand is what it means for "CLI or REST API
> > context [to be] present." Where does this context come from? A config
> > file in a standard location on the user's machine? Or is this
> > something that is only present when a user uploads a jar and then
> > Flink runs it in a specific context? Or both?
>
> When you upload a Jar to Flink, it can be run by the Flink master.
> Running a Jar on the job manager will just invoke the main method. The
> same happens when you use the Flink CLI tool, the only difference being
> that the Jar runs on the user machine vs on the Flink master. In both
> cases, the Flink config file will be present in a standard location,
> i.e. "<flink_installation>/conf/flink_conf.yaml".
>
> What Flink users typically do, is to call this method to construct a
> Flink dataflow (a.k.a. job):
>    env = ExecutionEnvironment.getExecutionEnvironment()
>    env.addSource().flatMap()
>    env.execute()
> This registers an environment which holds the Flink dataflow definition.
>
> The Flink Runner also does this when flink_master is set to "[auto]".
> When it is set to "[local]", it will attempt to execute the whole
> pipeline locally (LocalEnvironment). When an address has been specified
> it will submit against a remote Flink cluster (RemoteEnvironment). The
> last two use cases do not make any sense for the user when they use the
> Python FlinkRunner which uses the jar upload feature of Flink. That's
> why "[auto]" is our choice for the FlinkUberJarJobServer feature of the
> Python FlinkRunner.
>
> In the case of the FlinkJarJobServer to use "[auto]" can also make sense
> because you can even specify a Flink config directory for the Flink job
> server to use. Without a config, auto will always fall back to local
> execution.

Thanks for clarifying. So when I run "./flink my_pipeline.jar"  or
upload the jar via the REST API (and its main method invoked on the
master) then [auto] reads the config and does the right thing, but if
I do java my_pipeline.jar it'll run locally.

> > One more question: https://issues.apache.org/jira/browse/BEAM-8396
> > still seems valuable, but with [auto] as the default, how should we
> > detect whether LOOPBACK is safe to enable from Python?
>
> Yes, it is valuable. I suspect we only want to enable it for local
> execution?

Yes.

> We could let the actual Runner handle this by falling back to
> the default environment in case it detects that the execution will not
> be local. It can simply tell Python then to shutdown the loopback
> server, or it shuts itself down after a timeout.

Python needs to know even whether to start up the loopback server, and
provide the address when submitting the pipeline. If I understood
correctly above, the only time that the job server interprets [auto]
as something other than [local] is when creating the jar for later
submission. (In this case the flink master isn't even used, other than
being baked into the jar, right? And baking anything in but [auto]
seems wrong...) So it seems we could guard using LOOPBACK it on this
flag + [local] or [auto].

> Another option would
> be, to only support it when the mode is set to "[local]".

Well, I'd really like to support it by default...

> On 30.10.19 21:05, Robert Bradshaw wrote:
> > One more question: https://issues.apache.org/jira/browse/BEAM-8396
> > still seems valuable, but with [auto] as the default, how should we
> > detect whether LOOPBACK is safe to enable from Python?
> >
> > On Wed, Oct 30, 2019 at 11:53 AM Robert Bradshaw <rober...@google.com> 
> > wrote:
> >>
> >> Sounds good to me.
> >>
> >> One thing I don't understand is what it means for "CLI or REST API
> >> context [to be] present." Where does this context come from? A config
> >> file in a standard location on the user's machine? Or is this
> >> something that is only present when a user uploads a jar and then
> >> Flink runs it in a specific context? Or both?
> >>
> >> On Tue, Oct 29, 2019 at 3:27 AM Maximilian Michels <m...@apache.org> wrote:
> >>>
> >>> tl;dr:
> >>>
> >>> - I see consensus for inferring "http://"; in Python to align it with the
> >>> Java behavior which currently requires leaving out the protocol scheme.
> >>> Optionally, Java could also accept a scheme which gets removed as
> >>> required by the Flink Java Rest client.
> >>>
> >>> - We won't support "https://"; in Python for now, because it requires
> >>> additional SSL setup, i.e. parsing the Flink config file and setting up
> >>> the truststore
> >>>
> >>> - We want to keep "[auto]"/"[local]" but fix the current broken behavior
> >>> via https://issues.apache.org/jira/browse/BEAM-8507
> >>>
> >>>
> >>> Additional comments below:
> >>>
> >>>> One concern with this is that just supplying host:port is the existing
> >>>> behavior, so we can't start requiring the http://
> >>>
> >>> I wouldn't require it but optionally support it, otherwise add it
> >>> automatically.
> >>>
> >>>> One question I have is if there are other
> >>>> authentication parameters that may be required for speaking to a flink
> >>>> endpoint that we should be aware of (that would normally be buried in
> >>>> the config file).
> >>>
> >>> Yes, essentially all the SSL configuration is in the config file,
> >>> including the location of the truststore, the password, certificates, etc.
> >>>
> >>> For now, I would say we cannot properly support SSL in Python, unless we
> >>> find a way to load the truststore from Python.
> >>>
> >>>> I do like being explicit with something like [local] rather than
> >>>> treating the empty string in a magical way.
> >>>
> >>> Fine, we can keep "[local]" and throw an error in case the address is
> >>> empty. Let's also throw an error in case the Flink CLI tool is used with
> >>> local execution because that is clearly not what the user wants.
> >>>
> >>>>> I'd like to see this issue resolved before 2.17 as changing the public 
> >>>>> API once it's released will be harder.
> >>>>
> >>>> +1. In particular, I misunderstood that [auto] is not supported by 
> >>>> `FlinkUberJarJobServer`. Since [auto] is now the default, it's broken 
> >>>> for Python 3.6+.
> >>>
> >>> +1 Let's fix this in time for the release.
> >>>
> >>>> The user shouldn't have to specify a protocol for Python, I think it's 
> >>>> preferable and reasonable to handle that for them in order to maintain 
> >>>> existing behavior and align with Java SDK.
> >>>
> >>> +1
> >>>
> >>>> Looks like we also have a [collection] configuration value [1].
> >>>
> >>> Yeah, I think it is acceptable to remove this entirely. This has never
> >>> been used by anyone and is an unmaintained Flink feature.
> >>>
> >>>> I would find it acceptable to interpret absence of the option as 
> >>>> "[auto]", which really means use CLI or REST API context when present, 
> >>>> or local. I would prefer to not have an empty string value default (but 
> >>>> rather None/null) and no additional magic values.
> >>>
> >>> Let's keep "[auto]" then to keep it explicit. An empty string should
> >>> throw an error.
> >>>
> >>>
> >>>> One more reason that was not part of this discussion yet.
> >>>
> >>> @Jan: Supporting context classloaders in local mode is a new feature and
> >>> for keeping it simple, I'd start a new thread for it.
> >>>
> >>>
> >>> On 29.10.19 10:55, Jan Lukavský wrote:
> >>>> Hi,
> >>>>
> >>>> +1 for empty string being interpreted as [auto] and anything else having
> >>>> explicit notation.
> >>>>
> >>>> One more reason that was not part of this discussion yet. In [1] there
> >>>> was a discussion about LocalEnvironment (that is the one that is
> >>>> responsible for spawning in process Flink cluster) not using context
> >>>> classloader and thus can fail loading some user code (if this code was
> >>>> added to context classloader *after* application has been run).
> >>>> LocalEnvironment on the other hand supposes that all classes can be
> >>>> loaded by applicaiton's classloader and doesn't accept any "client
> >>>> jars". Therefore - when application generates classes dynamically during
> >>>> runtime it is currently impossible to run those using local flink
> >>>> runner. There is a nasty hack for JDK <= 8 (injecting URL into
> >>>> applications URLClassLoader), but that fails hard on JDK >= 9 
> >>>> (obviously).
> >>>>
> >>>> The conclusion from that thread is that it could be solved by manually
> >>>> running MiniCluster (which will run on localhost:8081 by default) and
> >>>> then use this REST address for RemoteEnvironment so that the application
> >>>> would be actually submitted as if it would be run on remote cluster and
> >>>> therefore a dynamically generated JAR can be attached to it.
> >>>>
> >>>> That would mean, that we can actually have two "local" modes - one using
> >>>> LocalEnvironment and one with manual MiniCluster + RemoteEnvironment (if
> >>>> for whatever reason we would like to keep both mode of local operation).
> >>>> That could mean two masters - e.g. [local] and [local-over-remote] or
> >>>> something like that.
> >>>>
> >>>> Jan
> >>>>
> >>>> [1]
> >>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ClassLoader-created-by-BlobLibraryCacheManager-is-not-using-context-classloader-tp32434.html
> >>>>
> >>>> On 10/29/19 5:50 AM, Thomas Weise wrote:
> >>>>> The current semantics of flink_master are tied to the Flink Java API.
> >>>>> The Flink client / Java API isn't a "REST API". It now uses the REST
> >>>>> API somewhere deep in RemoteEnvironment when the flink_master value is
> >>>>> host:port, but it does a lot of other things as well, such are parsing
> >>>>> config files and running local clusters.
> >>>>>
> >>>>> A rest client to me is a thin wrapper around
> >>>>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html
> >>>>>
> >>>>>
> >>>>> Different ways used in Beam to submit jobs to Flink:
> >>>>>
> >>>>> - classic Flink runner, using Flink Java client
> >>>>> - job server, using Flink Java client
> >>>>> - generated jar, using Flink Java client
> >>>>> - Flink CLI with generated jar, using Flink Java client
> >>>>> - Python "FlinkRunner" using *REST API*
> >>>>>
> >>>>> Since the last item does not submit through the Flink Java client,
> >>>>> there is a problem with the flink_master pipeline option. Though we
> >>>>> cannot change the option in a way that breaks existing setups. Like
> >>>>> Robert suggests, we could allow for optional URL syntax so that it can
> >>>>> be used with the REST API, in which case the code path that goes
> >>>>> through the Java client will have to disassemble such URL before
> >>>>> handing it to Flink.
> >>>>>
> >>>>> I would find it acceptable to interpret absence of the option as
> >>>>> "[auto]", which really means use CLI or REST API context when present,
> >>>>> or local. I would prefer to not have an empty string value default
> >>>>> (but rather None/null) and no additional magic values.
> >>>>>
> >>>>> Thomas
> >>>>>
> >>>>>
> >>>>> On Mon, Oct 28, 2019 at 4:16 PM Kyle Weaver <kcwea...@google.com
> >>>>> <mailto:kcwea...@google.com>> wrote:
> >>>>>
> >>>>>      Filed https://issues.apache.org/jira/browse/BEAM-8507 for the
> >>>>>      issue I mentioned.
> >>>>>
> >>>>>      On Mon, Oct 28, 2019 at 4:12 PM Kyle Weaver <kcwea...@google.com
> >>>>>      <mailto:kcwea...@google.com>> wrote:
> >>>>>
> >>>>>          > I'd like to see this issue resolved before 2.17 as changing
> >>>>>          the public API once it's released will be harder.
> >>>>>
> >>>>>          +1. In particular, I misunderstood that [auto] is not
> >>>>>          supported by `FlinkUberJarJobServer`. Since [auto] is now the
> >>>>>          default, it's broken for Python 3.6+.
> >>>>>
> >>>>>          requests.exceptions.InvalidURL: Failed to parse: 
> >>>>> [auto]/v1/config
> >>>>>
> >>>>>          We definitely should fix that, if nothing else.
> >>>>>
> >>>>>          > One concern with this is that just supplying host:port is
> >>>>>          the existing
> >>>>>          > behavior, so we can't start requiring the http://.
> >>>>>
> >>>>>          The user shouldn't have to specify a protocol for Python, I
> >>>>>          think it's preferable and reasonable to handle that for them
> >>>>>          in order to maintain existing behavior and align with Java SDK.
> >>>>>
> >>>>>          > 2. Deprecate the "[auto]" and "[local]" values. It should be
> >>>>>          sufficient
> >>>>>          > to have either a non-empty address string or an empty one.
> >>>>>          The empty
> >>>>>          > string would either mean local execution or, in the context
> >>>>>          of the Flink
> >>>>>          > CLI tool, loading the master address from the config. The
> >>>>>          non-empty
> >>>>>          > string would be interpreted as a cluster address.
> >>>>>
> >>>>>          Looks like we also have a [collection] configuration value [1].
> >>>>>
> >>>>>          If we're using [auto] as the default, I don't think this
> >>>>>          really makes so much of a difference (as long as we're
> >>>>>          supporting and documenting these properly, of course). I'm not
> >>>>>          sure there's a compelling reason to change this?
> >>>>>
> >>>>>          > always run locally (the least surprising to me
> >>>>>
> >>>>>          I agree a local cluster should remain the default, whether
> >>>>>          that is achieved through [local] or [auto] or some new
> >>>>>          mechanism such as the above.
> >>>>>
> >>>>>          [1]
> >>>>>          
> >>>>> https://github.com/apache/beam/blob/2f1b56ccc506054e40afe4793a8b556e872e1865/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L80
> >>>>>
> >>>>>          On Mon, Oct 28, 2019 at 6:35 AM Maximilian Michels
> >>>>>          <m...@apache.org <mailto:m...@apache.org>> wrote:
> >>>>>
> >>>>>              Hi,
> >>>>>
> >>>>>              Robert and Kyle have been doing great work to simplify
> >>>>>              submitting
> >>>>>              portable pipelines with the Flink Runner. Part of this is
> >>>>>              having a
> >>>>>              Python "FlinkRunner" which handles bringing up a Beam job
> >>>>>              server and
> >>>>>              submitting the pipeline directly via the Flink REST API.
> >>>>>              One building
> >>>>>              block is the creation of "executable Jars" which contain 
> >>>>> the
> >>>>>              materialized / translated Flink pipeline and do not
> >>>>>              require the Beam job
> >>>>>              server or the Python driver anymore.
> >>>>>
> >>>>>              While unifying a newly introduced option
> >>>>>              "flink_master_url" with the
> >>>>>              pre-existing "flink_master" [1][2], some questions came up
> >>>>>              about Flink's
> >>>>>              execution modes. (The two options are meant to do the same
> >>>>>              thing:
> >>>>>              provide the address of the Flink master to hand-over the
> >>>>>              translated
> >>>>>              pipeline.)
> >>>>>
> >>>>>              Historically, Flink had a proprietary protocol for
> >>>>>              submitting pipelines,
> >>>>>              running on port 9091. This has since been replaced with a
> >>>>>              REST protocol
> >>>>>              at port 8081. To this date, this has implications how you
> >>>>>              submit
> >>>>>              programs, e.g. the Flink client libraries expects the
> >>>>>              address to be of
> >>>>>              form "host:port", without a protocol scheme. On the other
> >>>>>              hand, external
> >>>>>              Rest libraries typically expect a protocol scheme.
> >>>>>
> >>>>>              But this is only half of the fun. There are also special
> >>>>>              addresses for
> >>>>>              "flink_master" that influence submission of the pipeline.
> >>>>>              If you specify
> >>>>>              "[local]" as the address, the pipeline won't be submitted
> >>>>>              but executed
> >>>>>              in a local in-process Flink cluster. If you specify
> >>>>>              "[auto]" and you use
> >>>>>              the CLI tool that comes bundled with Flink, then the
> >>>>>              master address will
> >>>>>              be loaded from the Flink config, including any
> >>>>>              configuration like SSL.
> >>>>>              If none is found, then it falls back to "[local]".
> >>>>>
> >>>>>              This is a bit odd, and after a discussion with Robert and
> >>>>>              Thomas in [1],
> >>>>>              we figured that this needs to be changed:
> >>>>>
> >>>>>              1. Make the master address a URL. Add "http://"; to
> >>>>>              "flink_master" in
> >>>>>              Python if no scheme is specified. Similarly, remove any
> >>>>>              "http://"; in
> >>>>>              Java, since the Java rest client does not expect a scheme.
> >>>>>              In case of
> >>>>>              "http_s_://", we have a special treatment to load the SSL
> >>>>>              settings from
> >>>>>              the Flink config.
> >>>>>
> >>>>>              2. Deprecate the "[auto]" and "[local]" values. It should
> >>>>>              be sufficient
> >>>>>              to have either a non-empty address string or an empty one.
> >>>>>              The empty
> >>>>>              string would either mean local execution or, in the
> >>>>>              context of the Flink
> >>>>>              CLI tool, loading the master address from the config. The
> >>>>>              non-empty
> >>>>>              string would be interpreted as a cluster address.
> >>>>>
> >>>>>
> >>>>>              Any opinions on this?
> >>>>>
> >>>>>
> >>>>>              Thanks,
> >>>>>              Max
> >>>>>
> >>>>>
> >>>>>              [1] https://github.com/apache/beam/pull/9803
> >>>>>              [2] https://github.com/apache/beam/pull/9844
> >>>>>

Reply via email to