I want to nitpick slightly the wording of "Java-only runner". I would like/expect that a runner using some specialized Java execution paths would still be accepting a portable pipeline and using the URNs and URLs to pick out special codepaths, so it is still different than just leaving the old codepaths in place. And it is on a spectrum, per-Fn / per-transform, where you should be using cost estimation to decide whether such a physical plan is profitable.
Kenn On Thu, Mar 8, 2018 at 12:47 PM Robert Bradshaw <rober...@google.com> wrote: > All runners should support portable execution for Java, which should be > just as easy as supporting execution of non-Java pipelines over this API. > > As for non-portable "specialized" execution of Java, I think it's a > tradeoff between the overhead of the portability framework vs. the > maintenance cost of providing a separate java-only runner. In time I see > the former dropping (though there's perhaps a lower bound for how far it > can go) and the latter increasing, and the cross-over point may be > different for different runners and users, but would echo the sentiments > that portable execution is the baseline. > > > On Thu, Mar 8, 2018 at 12:38 PM Kenneth Knowles <k...@google.com> wrote: > >> +1 to Luke's answer of "yes" for everything to be "portable by default". >> >> However, I (always) favor decentralizing this decision as long as the >> "Beam model" is respected. >> >> Baseline: >> - the input pipeline should always be in portable format >> - the results of execution should match portable execution (which we >> have never defined clearly and maybe never will bother... the Fn API is >> geared toward performance and ad-hoc use according to a runner's physical >> plan, but if we decided to build a spec for the pipeline proto it would >> include at least the part where an SDK owns the semantics of a Fn) >> >> So in general each "DoFn" (and other Fn) is a struct with roughly { env = >> URL, urn = <"name" of fn, modulo parameterization>, bytes = <serialized >> form that the container understands> } where the runner has never seen the >> URL before, may not know the URN, and likely cannot interpret the bytes at >> all. There is no choice but to ask the SDK to apply it according to the >> required computational pattern (ParDo, etc). Any execution strategy that >> yields the same result is allowable. >> >> This format for user Fns is _intended_ to support direct execution by the >> runner without sending to an SDK and is already used for standard window >> fns that have an SDK-agnostic proto representation [1]. So the Go SDK can >> submit a Window.into(<fixed windows of 1 hour>) and the runner can just do >> that. The case where the URN is "java dofn" and the bytes are a serialized >> Java DoFn from the user's staged jars is more difficult. >> >> However, supporting portable execution alongside this specialization is a >> lot of maintenance overhead and as Luke points out causes other user pain >> having nothing to do with cross-language requirements. I would definitely >> reset our perspective to take portable black-box execution as the baseline. >> >> Kenn >> >> [1] >> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/standard_window_fns.proto >> >> On Thu, Mar 8, 2018 at 11:18 AM Lukasz Cwik <lc...@google.com> wrote: >> >>> I ran some very pessimistic pipelines that were shuffle heavy (Random KV >>> -> GBK -> IdentityDoFn) and found that the performance overhead was 15% >>> when executed with Dataflow. This is a while back and there was a lot of >>> inefficiencies due to coder encode/decode cycles and based upon profiling >>> information surmised the with some work to reduce the amount of times that >>> byte[] are copied that this could get reduced to about 8%. I can't say how >>> this will impact Flink as its a different execution engine but we should >>> gather data first. >>> >>> On Thu, Mar 8, 2018 at 11:10 AM, Thomas Weise <t...@apache.org> wrote: >>> >>>> Performance, due to the extra gRPC hop. >>>> >>>> >>>> On Thu, Mar 8, 2018 at 11:08 AM, Lukasz Cwik <lc...@google.com> wrote: >>>> >>>>> The goal is to use containers (and similar technologies) in the >>>>> future. It really hinders pipeline portability between runners if you also >>>>> have to deal with the dependency conflicts between >>>>> Flink/Dataflow/Spark/... >>>>> execution runtimes. >>>>> >>>>> What kinds of penalty are you referring to (perf, user complexity, >>>>> ...)? >>>>> >>>>> >>>>> >>>>> On Thu, Mar 8, 2018 at 11:02 AM, Thomas Weise <t...@apache.org> wrote: >>>>> >>>>>> I'm curious if pipelines that are exclusively Java will be executed >>>>>> (when running on Flink or other JVM based runnner) in separate harness >>>>>> containers also? This would impose a significant penalty compared to the >>>>>> current execution model. Will this be something the user can control? >>>>>> >>>>>> Thanks, >>>>>> Thomas >>>>>> >>>>>> >>>>>> On Wed, Mar 7, 2018 at 2:09 PM, Aljoscha Krettek <aljos...@apache.org >>>>>> > wrote: >>>>>> >>>>>>> @Axel I assigned https://issues.apache.org/jira/browse/BEAM-2588 to >>>>>>> you. It might make sense to also grab other issues that you're already >>>>>>> working on. >>>>>>> >>>>>>> >>>>>>> On 7. Mar 2018, at 21:18, Aljoscha Krettek <aljos...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>> Cool, so we had the same ideas. I think this indicates that we're >>>>>>> not completely on the wrong track with this! ;-) >>>>>>> >>>>>>> Aljoscha >>>>>>> >>>>>>> On 7. Mar 2018, at 21:14, Thomas Weise <t...@apache.org> wrote: >>>>>>> >>>>>>> Ben, >>>>>>> >>>>>>> Looks like we hit the send button at the same time. Is the plan the >>>>>>> to derive the Flink implementation of the various execution services >>>>>>> from >>>>>>> those under org.apache.beam.runners.fnexecution ? >>>>>>> >>>>>>> Thanks >>>>>>> >>>>>>> On Wed, Mar 7, 2018 at 11:02 AM, Thomas Weise <t...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>>> What's the plan for the endpoints that the Flink operator needs to >>>>>>>> provide (control/data plane, state, logging)? Is the intention to >>>>>>>> provide >>>>>>>> base implementations that can be shared across runners and then >>>>>>>> implement >>>>>>>> the Flink specific parts on top of it? Has work started on those? >>>>>>>> >>>>>>>> If there are subtasks ready to be taken up I would be interested. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Thomas >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Mar 7, 2018 at 9:35 AM, Ben Sidhom <sid...@google.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Yes, Axel has started work on such a shim. >>>>>>>>> >>>>>>>>> Our plan in the short term is to keep the old FlinkRunner around >>>>>>>>> and to call into it to process jobs from the job service itself. That >>>>>>>>> way >>>>>>>>> we can keep the non-portable runner fully-functional while working on >>>>>>>>> portability. Eventually, I think it makes sense for this to go away, >>>>>>>>> but we >>>>>>>>> haven't given much thought to that. The translator layer will likely >>>>>>>>> stay >>>>>>>>> the same, and the FlinkRunner bits are a relatively simple wrapper >>>>>>>>> around >>>>>>>>> translation, so it should be simple enough to factor this out. >>>>>>>>> >>>>>>>>> Much of the service code from the Universal Local Runner (ULR) >>>>>>>>> should be composed and reused with other runner implementations. >>>>>>>>> Thomas and >>>>>>>>> Axel have more context around that. >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Mar 7, 2018 at 8:47 AM Aljoscha Krettek < >>>>>>>>> aljos...@apache.org> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> Has anyone started on >>>>>>>>>> https://issues.apache.org/jira/browse/BEAM-2588 (FlinkRunner >>>>>>>>>> shim for serving Job API). If not I would start on that. >>>>>>>>>> >>>>>>>>>> My plan is to implement a FlinkJobService that implements >>>>>>>>>> JobServiceImplBase, >>>>>>>>>> similar to ReferenceRunnerJobService. This would have a lot of the >>>>>>>>>> functionality that FlinkRunner currently has. As a next step, I >>>>>>>>>> would add a >>>>>>>>>> JobServiceRunner that can submit Pipelines to a JobService. >>>>>>>>>> >>>>>>>>>> For testing, I would probably add functionality that allows >>>>>>>>>> spinning up a JobService in-process with the JobServiceRunner. I can >>>>>>>>>> imagine for testing we could even eventually use something like: >>>>>>>>>> "--runner=JobServiceRunner", "--streaming=true", >>>>>>>>>> "--jobService=FlinkRunnerJobService". >>>>>>>>>> >>>>>>>>>> Once all of this is done, we only need the python component that >>>>>>>>>> talks to the JobService to submit a pipeline. >>>>>>>>>> >>>>>>>>>> What do you think about the plan? >>>>>>>>>> >>>>>>>>>> Btw, I feel that the thing currently called Runner, i.e. >>>>>>>>>> FlinkRunner will go way in the long run and we will have >>>>>>>>>> FlinkJobService, >>>>>>>>>> SparkJobService and whatnot, what do you think? >>>>>>>>>> >>>>>>>>>> Aljoscha >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 9. Feb 2018, at 01:31, Ben Sidhom <sid...@google.com> wrote: >>>>>>>>>> >>>>>>>>>> Hey all, >>>>>>>>>> >>>>>>>>>> We're working on getting the portability framework plumbed >>>>>>>>>> through the Flink runner. The first iteration will likely only >>>>>>>>>> support >>>>>>>>>> batch and will be limited in its deployment flexibility, but >>>>>>>>>> hopefully it >>>>>>>>>> shouldn't be too painful to expand this. >>>>>>>>>> >>>>>>>>>> We have the start of a tracking doc here: >>>>>>>>>> https://s.apache.org/portable-beam-on-flink. >>>>>>>>>> >>>>>>>>>> We've documented the general deployment strategy here: >>>>>>>>>> https://s.apache.org/portable-flink-runner-overview. >>>>>>>>>> >>>>>>>>>> Feel free to provide comments on the docs or jump in on any of >>>>>>>>>> the referenced bugs. >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> -Ben >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> -Ben >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>>