+1 to Luke's answer of "yes" for everything to be "portable by default".
However, I (always) favor decentralizing this decision as long as the "Beam model" is respected. Baseline: - the input pipeline should always be in portable format - the results of execution should match portable execution (which we have never defined clearly and maybe never will bother... the Fn API is geared toward performance and ad-hoc use according to a runner's physical plan, but if we decided to build a spec for the pipeline proto it would include at least the part where an SDK owns the semantics of a Fn) So in general each "DoFn" (and other Fn) is a struct with roughly { env = URL, urn = <"name" of fn, modulo parameterization>, bytes = <serialized form that the container understands> } where the runner has never seen the URL before, may not know the URN, and likely cannot interpret the bytes at all. There is no choice but to ask the SDK to apply it according to the required computational pattern (ParDo, etc). Any execution strategy that yields the same result is allowable. This format for user Fns is _intended_ to support direct execution by the runner without sending to an SDK and is already used for standard window fns that have an SDK-agnostic proto representation [1]. So the Go SDK can submit a Window.into(<fixed windows of 1 hour>) and the runner can just do that. The case where the URN is "java dofn" and the bytes are a serialized Java DoFn from the user's staged jars is more difficult. However, supporting portable execution alongside this specialization is a lot of maintenance overhead and as Luke points out causes other user pain having nothing to do with cross-language requirements. I would definitely reset our perspective to take portable black-box execution as the baseline. Kenn [1] https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/standard_window_fns.proto On Thu, Mar 8, 2018 at 11:18 AM Lukasz Cwik <lc...@google.com> wrote: > I ran some very pessimistic pipelines that were shuffle heavy (Random KV > -> GBK -> IdentityDoFn) and found that the performance overhead was 15% > when executed with Dataflow. This is a while back and there was a lot of > inefficiencies due to coder encode/decode cycles and based upon profiling > information surmised the with some work to reduce the amount of times that > byte[] are copied that this could get reduced to about 8%. I can't say how > this will impact Flink as its a different execution engine but we should > gather data first. > > On Thu, Mar 8, 2018 at 11:10 AM, Thomas Weise <t...@apache.org> wrote: > >> Performance, due to the extra gRPC hop. >> >> >> On Thu, Mar 8, 2018 at 11:08 AM, Lukasz Cwik <lc...@google.com> wrote: >> >>> The goal is to use containers (and similar technologies) in the future. >>> It really hinders pipeline portability between runners if you also have to >>> deal with the dependency conflicts between Flink/Dataflow/Spark/... >>> execution runtimes. >>> >>> What kinds of penalty are you referring to (perf, user complexity, ...)? >>> >>> >>> >>> On Thu, Mar 8, 2018 at 11:02 AM, Thomas Weise <t...@apache.org> wrote: >>> >>>> I'm curious if pipelines that are exclusively Java will be executed >>>> (when running on Flink or other JVM based runnner) in separate harness >>>> containers also? This would impose a significant penalty compared to the >>>> current execution model. Will this be something the user can control? >>>> >>>> Thanks, >>>> Thomas >>>> >>>> >>>> On Wed, Mar 7, 2018 at 2:09 PM, Aljoscha Krettek <aljos...@apache.org> >>>> wrote: >>>> >>>>> @Axel I assigned https://issues.apache.org/jira/browse/BEAM-2588 to >>>>> you. It might make sense to also grab other issues that you're already >>>>> working on. >>>>> >>>>> >>>>> On 7. Mar 2018, at 21:18, Aljoscha Krettek <aljos...@apache.org> >>>>> wrote: >>>>> >>>>> Cool, so we had the same ideas. I think this indicates that we're not >>>>> completely on the wrong track with this! ;-) >>>>> >>>>> Aljoscha >>>>> >>>>> On 7. Mar 2018, at 21:14, Thomas Weise <t...@apache.org> wrote: >>>>> >>>>> Ben, >>>>> >>>>> Looks like we hit the send button at the same time. Is the plan the to >>>>> derive the Flink implementation of the various execution services from >>>>> those under org.apache.beam.runners.fnexecution ? >>>>> >>>>> Thanks >>>>> >>>>> On Wed, Mar 7, 2018 at 11:02 AM, Thomas Weise <t...@apache.org> wrote: >>>>> >>>>>> What's the plan for the endpoints that the Flink operator needs to >>>>>> provide (control/data plane, state, logging)? Is the intention to provide >>>>>> base implementations that can be shared across runners and then implement >>>>>> the Flink specific parts on top of it? Has work started on those? >>>>>> >>>>>> If there are subtasks ready to be taken up I would be interested. >>>>>> >>>>>> Thanks, >>>>>> Thomas >>>>>> >>>>>> >>>>>> On Wed, Mar 7, 2018 at 9:35 AM, Ben Sidhom <sid...@google.com> wrote: >>>>>> >>>>>>> Yes, Axel has started work on such a shim. >>>>>>> >>>>>>> Our plan in the short term is to keep the old FlinkRunner around and >>>>>>> to call into it to process jobs from the job service itself. That way we >>>>>>> can keep the non-portable runner fully-functional while working on >>>>>>> portability. Eventually, I think it makes sense for this to go away, >>>>>>> but we >>>>>>> haven't given much thought to that. The translator layer will likely >>>>>>> stay >>>>>>> the same, and the FlinkRunner bits are a relatively simple wrapper >>>>>>> around >>>>>>> translation, so it should be simple enough to factor this out. >>>>>>> >>>>>>> Much of the service code from the Universal Local Runner (ULR) >>>>>>> should be composed and reused with other runner implementations. Thomas >>>>>>> and >>>>>>> Axel have more context around that. >>>>>>> >>>>>>> >>>>>>> On Wed, Mar 7, 2018 at 8:47 AM Aljoscha Krettek <aljos...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> Has anyone started on >>>>>>>> https://issues.apache.org/jira/browse/BEAM-2588 (FlinkRunner shim >>>>>>>> for serving Job API). If not I would start on that. >>>>>>>> >>>>>>>> My plan is to implement a FlinkJobService that implements >>>>>>>> JobServiceImplBase, >>>>>>>> similar to ReferenceRunnerJobService. This would have a lot of the >>>>>>>> functionality that FlinkRunner currently has. As a next step, I would >>>>>>>> add a >>>>>>>> JobServiceRunner that can submit Pipelines to a JobService. >>>>>>>> >>>>>>>> For testing, I would probably add functionality that allows >>>>>>>> spinning up a JobService in-process with the JobServiceRunner. I can >>>>>>>> imagine for testing we could even eventually use something like: >>>>>>>> "--runner=JobServiceRunner", "--streaming=true", >>>>>>>> "--jobService=FlinkRunnerJobService". >>>>>>>> >>>>>>>> Once all of this is done, we only need the python component that >>>>>>>> talks to the JobService to submit a pipeline. >>>>>>>> >>>>>>>> What do you think about the plan? >>>>>>>> >>>>>>>> Btw, I feel that the thing currently called Runner, i.e. >>>>>>>> FlinkRunner will go way in the long run and we will have >>>>>>>> FlinkJobService, >>>>>>>> SparkJobService and whatnot, what do you think? >>>>>>>> >>>>>>>> Aljoscha >>>>>>>> >>>>>>>> >>>>>>>> On 9. Feb 2018, at 01:31, Ben Sidhom <sid...@google.com> wrote: >>>>>>>> >>>>>>>> Hey all, >>>>>>>> >>>>>>>> We're working on getting the portability framework plumbed through >>>>>>>> the Flink runner. The first iteration will likely only support batch >>>>>>>> and >>>>>>>> will be limited in its deployment flexibility, but hopefully it >>>>>>>> shouldn't >>>>>>>> be too painful to expand this. >>>>>>>> >>>>>>>> We have the start of a tracking doc here: >>>>>>>> https://s.apache.org/portable-beam-on-flink. >>>>>>>> >>>>>>>> We've documented the general deployment strategy here: >>>>>>>> https://s.apache.org/portable-flink-runner-overview. >>>>>>>> >>>>>>>> Feel free to provide comments on the docs or jump in on any of the >>>>>>>> referenced bugs. >>>>>>>> >>>>>>>> -- >>>>>>>> -Ben >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> -Ben >>>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>>> >>>> >>> >> >