Thanks for the follow up, Thomas. On Mon, Oct 28, 2019 at 7:55 PM Thomas Weise <[email protected]> wrote:
> Follow-up for users looking to run portable pipelines on Flink: > > After prototyping the generate-jar-file approach for internal deployment > and some related discussion, the conclusion was that it is too limiting. > The sticky point is that the jar file would need to be generated at > container build time. That does not allow us to execute any logic in the > Python driver program that depends on the deploy environment, such as > retrieval of environment variables for configuration/credentials, setting a > submission timestamp for stream positioning etc. > > What worked well was that no job server was required to submit the Flink > job and the jar file could be used with the existing Flink tooling; there > was no need to change the FlinkK8sOperator > <https://github.com/lyft/flinkk8soperator> at all. > > I then looked for a way to eliminate the build time translation and > execute the Python driver program when the job is submitted, but still as a > Flink entry point w/o extra job server deployment and client side > dependencies. How can that work? > > https://issues.apache.org/jira/browse/BEAM-8471 > > The main point was that there should be no requirement to install things > on the client. FlinkK8sOperator is talking to the Flink REST API, w/o > Python or Java. The Python dependencies need to be present on the Flink job > manager host at the time the job is started through the REST API. That was > something we had already solved for our container image build, and from > conversation with few other folks this was their preferred container build > approach also. > > In the future we may seek the ability to separate Flink and > SDK/application bits into different images. For the SDK worker, this is > intended via the external environment and sidecar container. For the client > driver program, a similar approach could be implemented. Through an > "external client environment", instead of a local process execution. > > The new Flink runner can be used as entry point for the REST API, the > Flink CLI or standalone, especially for Flink centric automation. Of course > portable pipelines can also be directly submitted through the SDK language > client, via job server or other tooling, like the Python Flink client that > Robert contributed recently. > > Thanks, > Thomas > > > On Thu, Aug 22, 2019 at 12:58 PM Kyle Weaver <[email protected]> wrote: > >> Following up on discussion in this morning's OSS runners meeting, I have >> uploaded a draft PR for the full implementation (job creation + execution): >> https://github.com/apache/beam/pull/9408 >> >> Kyle Weaver | Software Engineer | github.com/ibzib | [email protected] >> >> >> On Tue, Aug 20, 2019 at 1:24 PM Robert Bradshaw <[email protected]> >> wrote: >> >>> The point of expansion services is to run at pipeline construction >>> time so that the caller can build on top of the outputs. E.g. we're >>> hoping to expose Beam's SQL transforms to other languages via an >>> expansion service and *not* duplicate the logic of parsing the SQL >>> statements to determine the type(s) of the outputs. Even for simpler >>> IOs, we would like to take advantage of schema information (e.g. >>> looked up at construction time) to produce results and validate (or >>> even inform) subsequent construction. >>> >>> I think we're also making a mistake in talking about "the" expansion >>> service here, as if there was only one well defined service that all >>> pipenes used. If we go the route of deferring some expansion to the >>> runner, we need a way of naming expansion services. It seems like this >>> proposal is simply isomorphic to defining new primitive transforms >>> which some (all?) runners are just expected to understand. >>> >>> On Tue, Aug 20, 2019 at 10:11 AM Thomas Weise <[email protected]> wrote: >>> > >>> > >>> > >>> > On Tue, Aug 20, 2019 at 8:56 AM Lukasz Cwik <[email protected]> wrote: >>> >> >>> >> >>> >> >>> >> On Mon, Aug 19, 2019 at 5:52 PM Ahmet Altay <[email protected]> wrote: >>> >>> >>> >>> >>> >>> >>> >>> On Sun, Aug 18, 2019 at 12:34 PM Thomas Weise <[email protected]> >>> wrote: >>> >>>> >>> >>>> There is a PR open for this: >>> https://github.com/apache/beam/pull/9331 >>> >>>> >>> >>>> (it wasn't tagged with the JIRA and therefore not linked) >>> >>>> >>> >>>> I think it is worthwhile to explore how we could further detangle >>> the client side Python and Java dependencies. >>> >>>> >>> >>>> The expansion service is one more dependency to consider in a build >>> environment. Is it really necessary to expand external transforms prior to >>> submission to the job service? >>> >>> >>> >>> >>> >>> +1, this will make it easier to use external transforms from the >>> already familiar client environments. >>> >>> >>> >> >>> >> >>> >> The intent is to make it so that you CAN (not MUST) run an expansion >>> service separate from a Runner. Creating a single endpoint that hosts both >>> the Job and Expansion service is something that gRPC does very easily since >>> you can host multiple service definitions on a single port. >>> > >>> > >>> > Yes, that's fine. The point here is when the expansion occurs. I >>> believe the runner can also invoke the expansion service, thereby >>> eliminating the expansion service interaction from the client side. >>> > >>> > >>> >> >>> >> >>> >>>> >>> >>>> >>> >>>> Can we come up with a partially constructed proto that can be >>> produced by just running the Python entry point? Note this would also >>> require pushing the pipeline options parsing into the job service. >>> >>> >>> >>> >>> >>> Why would this require pushing the pipeline options parsing to the >>> job service. Assuming that python will have enough idea about the external >>> transform what options it will need. The necessary bit could be converted >>> to arguments and be part of that partially constructed proto. >>> >>> >>> >>>> >>> >>>> >>> >>>> On Sun, Aug 18, 2019 at 12:01 PM enrico canzonieri < >>> [email protected]> wrote: >>> >>>>> >>> >>>>> I found the tracking ticket at BEAM-7966 >>> >>>>> >>> >>>>> On Sun, Aug 18, 2019 at 11:59 AM enrico canzonieri < >>> [email protected]> wrote: >>> >>>>>> >>> >>>>>> Is this alternative still being considered? Creating a portable >>> jar sounds like a good solution to re-use the existing runner specific >>> deployment mechanism (e.g. Flink k8s operator) and in general simplify the >>> deployment story. >>> >>>>>> >>> >>>>>> On Fri, Aug 9, 2019 at 12:46 AM Robert Bradshaw < >>> [email protected]> wrote: >>> >>>>>>> >>> >>>>>>> The expansion service is a separate service. (The flink jar >>> happens to >>> >>>>>>> bring both up.) However, there is negotiation to >>> receive/validate the >>> >>>>>>> pipeline options. >>> >>>>>>> >>> >>>>>>> On Fri, Aug 9, 2019 at 1:54 AM Thomas Weise <[email protected]> >>> wrote: >>> >>>>>>> > >>> >>>>>>> > We would also need to consider cross-language pipelines that >>> (currently) assume the interaction with an expansion service at >>> construction time. >>> >>>>>>> > >>> >>>>>>> > On Thu, Aug 8, 2019, 4:38 PM Kyle Weaver <[email protected]> >>> wrote: >>> >>>>>>> >> >>> >>>>>>> >> > It might also be useful to have the option to just output >>> the proto and artifacts, as alternative to the jar file. >>> >>>>>>> >> >>> >>>>>>> >> Sure, that wouldn't be too big a change if we were to decide >>> to go the SDK route. >>> >>>>>>> >> >>> >>>>>>> >> > For the Flink entry point we would need to allow for the >>> job server to be used as a library. >>> >>>>>>> >> >>> >>>>>>> >> We don't need the whole job server, we only need to add a >>> main method to FlinkPipelineRunner [1] as the entry point, which would >>> basically just do the setup described in the doc then call >>> FlinkPipelineRunner::run. >>> >>>>>>> >> >>> >>>>>>> >> [1] >>> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java#L53 >>> >>>>>>> >> >>> >>>>>>> >> Kyle Weaver | Software Engineer | github.com/ibzib | >>> [email protected] >>> >>>>>>> >> >>> >>>>>>> >> >>> >>>>>>> >> On Thu, Aug 8, 2019 at 4:21 PM Thomas Weise <[email protected]> >>> wrote: >>> >>>>>>> >>> >>> >>>>>>> >>> Hi Kyle, >>> >>>>>>> >>> >>> >>>>>>> >>> It might also be useful to have the option to just output >>> the proto and artifacts, as alternative to the jar file. >>> >>>>>>> >>> >>> >>>>>>> >>> For the Flink entry point we would need to allow for the job >>> server to be used as a library. It would probably not be too hard to have >>> the Flink job constructed via the context execution environment, which >>> would require no changes on the Flink side. >>> >>>>>>> >>> >>> >>>>>>> >>> Thanks, >>> >>>>>>> >>> Thomas >>> >>>>>>> >>> >>> >>>>>>> >>> >>> >>>>>>> >>> On Thu, Aug 8, 2019 at 9:52 AM Kyle Weaver < >>> [email protected]> wrote: >>> >>>>>>> >>>> >>> >>>>>>> >>>> Re Javaless/serverless solution: >>> >>>>>>> >>>> I take it this would probably mean that we would construct >>> the jar directly from the SDK. There are advantages to this: full >>> separation of Python and Java environments, no need for a job server, and >>> likely a simpler implementation, since we'd no longer have to work within >>> the constraints of the existing job server infrastructure. The only >>> downside I can think of is the additional cost of implementing/maintaining >>> jar creation code in each SDK, but that cost may be acceptable if it's >>> simple enough. >>> >>>>>>> >>>> >>> >>>>>>> >>>> Kyle Weaver | Software Engineer | github.com/ibzib | >>> [email protected] >>> >>>>>>> >>>> >>> >>>>>>> >>>> >>> >>>>>>> >>>> On Thu, Aug 8, 2019 at 9:31 AM Thomas Weise <[email protected]> >>> wrote: >>> >>>>>>> >>>>> >>> >>>>>>> >>>>> >>> >>>>>>> >>>>> >>> >>>>>>> >>>>> On Thu, Aug 8, 2019 at 8:29 AM Robert Bradshaw < >>> [email protected]> wrote: >>> >>>>>>> >>>>>> >>> >>>>>>> >>>>>> > Before assembling the jar, the job server runs to >>> create the ingredients. That requires the (matching) Java environment on >>> the Python developers machine. >>> >>>>>>> >>>>>> >>> >>>>>>> >>>>>> We can run the job server and have it create the jar (and >>> if we keep >>> >>>>>>> >>>>>> the job server running we can use it to interact with the >>> running >>> >>>>>>> >>>>>> job). However, if the jar layout is simple enough, >>> there's no need to >>> >>>>>>> >>>>>> even build it from Java. >>> >>>>>>> >>>>>> >>> >>>>>>> >>>>>> Taken to the extreme, this is a one-shot, jar-based >>> JobService API. We >>> >>>>>>> >>>>>> choose a standard layout of where to put the pipeline >>> description and >>> >>>>>>> >>>>>> artifacts, and can "augment" an existing jar (that has a >>> >>>>>>> >>>>>> runner-specific main class whose entry point knows how to >>> read this >>> >>>>>>> >>>>>> data to kick off a pipeline as if it were a users driver >>> code) into >>> >>>>>>> >>>>>> one that has a portable pipeline packaged into it for >>> submission to a >>> >>>>>>> >>>>>> cluster. >>> >>>>>>> >>>>> >>> >>>>>>> >>>>> >>> >>>>>>> >>>>> It would be nice if the Python developer doesn't have to >>> run anything Java at all. >>> >>>>>>> >>>>> >>> >>>>>>> >>>>> As we just discussed offline, this could be accomplished >>> by including the proto that is produced by the SDK into the pre-existing >>> jar. >>> >>>>>>> >>>>> >>> >>>>>>> >>>>> And if the jar has an entry point that creates the Flink >>> job in the prescribed manner [1], it can be directly submitted to the Flink >>> REST API. That would allow for Java free client. >>> >>>>>>> >>>>> >>> >>>>>>> >>>>> [1] >>> https://lists.apache.org/thread.html/6db869c53816f4e2917949a7c6992c2b90856d7d639d7f2e1cd13768@%3Cdev.flink.apache.org%3E >>> >>>>>>> >>>>> >>> >>
