I want to nitpick slightly the wording of "Java-only runner". I would
like/expect that a runner using some specialized Java execution paths would
still be accepting a portable pipeline and using the URNs and URLs to pick
out special codepaths, so it is still different than just leaving the old
codepaths in place. And it is on a spectrum, per-Fn / per-transform, where
you should be using cost estimation to decide whether such a physical plan
is profitable.

Kenn

On Thu, Mar 8, 2018 at 12:47 PM Robert Bradshaw <rober...@google.com> wrote:

> All runners should support portable execution for Java, which should be
> just as easy as supporting execution of non-Java pipelines over this API.
>
> As for non-portable "specialized" execution of Java, I think it's a
> tradeoff between the overhead of the portability framework vs. the
> maintenance cost of providing a separate java-only runner. In time I see
> the former dropping (though there's perhaps a lower bound for how far it
> can go) and the latter increasing, and the cross-over point may be
> different for different runners and users, but would echo the sentiments
> that portable execution is the baseline.
>
>
> On Thu, Mar 8, 2018 at 12:38 PM Kenneth Knowles <k...@google.com> wrote:
>
>> +1 to Luke's answer of "yes" for everything to be "portable by default".
>>
>> However, I (always) favor decentralizing this decision as long as the
>> "Beam model" is respected.
>>
>> Baseline:
>>  - the input pipeline should always be in portable format
>>  - the results of execution should match portable execution (which we
>> have never defined clearly and maybe never will bother... the Fn API is
>> geared toward performance and ad-hoc use according to a runner's physical
>> plan, but if we decided to build a spec for the pipeline proto it would
>> include at least the part where an SDK owns the semantics of a Fn)
>>
>> So in general each "DoFn" (and other Fn) is a struct with roughly { env =
>> URL, urn = <"name" of fn, modulo parameterization>, bytes = <serialized
>> form that the container understands> } where the runner has never seen the
>> URL before, may not know the URN, and likely cannot interpret the bytes at
>> all. There is no choice but to ask the SDK to apply it according to the
>> required computational pattern (ParDo, etc). Any execution strategy that
>> yields the same result is allowable.
>>
>> This format for user Fns is _intended_ to support direct execution by the
>> runner without sending to an SDK and is already used for standard window
>> fns that have an SDK-agnostic proto representation [1]. So the Go SDK can
>> submit a Window.into(<fixed windows of 1 hour>) and the runner can just do
>> that. The case where the URN is "java dofn" and the bytes are a serialized
>> Java DoFn from the user's staged jars is more difficult.
>>
>> However, supporting portable execution alongside this specialization is a
>> lot of maintenance overhead and as Luke points out causes other user pain
>> having nothing to do with cross-language requirements. I would definitely
>> reset our perspective to take portable black-box execution as the baseline.
>>
>> Kenn
>>
>> [1]
>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/standard_window_fns.proto
>>
>> On Thu, Mar 8, 2018 at 11:18 AM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> I ran some very pessimistic pipelines that were shuffle heavy (Random KV
>>> -> GBK -> IdentityDoFn) and found that the performance overhead was 15%
>>> when executed with Dataflow. This is a while back and there was a lot of
>>> inefficiencies due to coder encode/decode cycles and based upon profiling
>>> information surmised the with some work to reduce the amount of times that
>>> byte[] are copied that this could get reduced to about 8%. I can't say how
>>> this will impact Flink as its a different execution engine but we should
>>> gather data first.
>>>
>>> On Thu, Mar 8, 2018 at 11:10 AM, Thomas Weise <t...@apache.org> wrote:
>>>
>>>> Performance, due to the extra gRPC hop.
>>>>
>>>>
>>>> On Thu, Mar 8, 2018 at 11:08 AM, Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> The goal is to use containers (and similar technologies) in the
>>>>> future. It really hinders pipeline portability between runners if you also
>>>>> have to deal with the dependency conflicts between 
>>>>> Flink/Dataflow/Spark/...
>>>>> execution runtimes.
>>>>>
>>>>> What kinds of penalty are you referring to (perf, user complexity,
>>>>> ...)?
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Mar 8, 2018 at 11:02 AM, Thomas Weise <t...@apache.org> wrote:
>>>>>
>>>>>> I'm curious if pipelines that are exclusively Java will be executed
>>>>>> (when running on Flink or other JVM based runnner) in separate harness
>>>>>> containers also? This would impose a significant penalty compared to the
>>>>>> current execution model. Will this be something the user can control?
>>>>>>
>>>>>> Thanks,
>>>>>> Thomas
>>>>>>
>>>>>>
>>>>>> On Wed, Mar 7, 2018 at 2:09 PM, Aljoscha Krettek <aljos...@apache.org
>>>>>> > wrote:
>>>>>>
>>>>>>> @Axel I assigned https://issues.apache.org/jira/browse/BEAM-2588 to
>>>>>>> you. It might make sense to also grab other issues that you're already
>>>>>>> working on.
>>>>>>>
>>>>>>>
>>>>>>> On 7. Mar 2018, at 21:18, Aljoscha Krettek <aljos...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Cool, so we had the same ideas. I think this indicates that we're
>>>>>>> not completely on the wrong track with this! ;-)
>>>>>>>
>>>>>>> Aljoscha
>>>>>>>
>>>>>>> On 7. Mar 2018, at 21:14, Thomas Weise <t...@apache.org> wrote:
>>>>>>>
>>>>>>> Ben,
>>>>>>>
>>>>>>> Looks like we hit the send button at the same time. Is the plan the
>>>>>>> to derive the Flink implementation of the various execution services 
>>>>>>> from
>>>>>>> those under org.apache.beam.runners.fnexecution ?
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>> On Wed, Mar 7, 2018 at 11:02 AM, Thomas Weise <t...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> What's the plan for the endpoints that the Flink operator needs to
>>>>>>>> provide (control/data plane, state, logging)? Is the intention to 
>>>>>>>> provide
>>>>>>>> base implementations that can be shared across runners and then 
>>>>>>>> implement
>>>>>>>> the Flink specific parts on top of it? Has work started on those?
>>>>>>>>
>>>>>>>> If there are subtasks ready to be taken up I would be interested.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Thomas
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Mar 7, 2018 at 9:35 AM, Ben Sidhom <sid...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Yes, Axel has started work on such a shim.
>>>>>>>>>
>>>>>>>>> Our plan in the short term is to keep the old FlinkRunner around
>>>>>>>>> and to call into it to process jobs from the job service itself. That 
>>>>>>>>> way
>>>>>>>>> we can keep the non-portable runner fully-functional while working on
>>>>>>>>> portability. Eventually, I think it makes sense for this to go away, 
>>>>>>>>> but we
>>>>>>>>> haven't given much thought to that. The translator layer will likely 
>>>>>>>>> stay
>>>>>>>>> the same, and the FlinkRunner bits are a relatively simple wrapper 
>>>>>>>>> around
>>>>>>>>> translation, so it should be simple enough to factor this out.
>>>>>>>>>
>>>>>>>>> Much of the service code from the Universal Local Runner (ULR)
>>>>>>>>> should be composed and reused with other runner implementations. 
>>>>>>>>> Thomas and
>>>>>>>>> Axel have more context around that.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Mar 7, 2018 at 8:47 AM Aljoscha Krettek <
>>>>>>>>> aljos...@apache.org> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> Has anyone started on
>>>>>>>>>> https://issues.apache.org/jira/browse/BEAM-2588 (FlinkRunner
>>>>>>>>>> shim for serving Job API). If not I would start on that.
>>>>>>>>>>
>>>>>>>>>> My plan is to implement a FlinkJobService that implements 
>>>>>>>>>> JobServiceImplBase,
>>>>>>>>>> similar to ReferenceRunnerJobService. This would have a lot of the
>>>>>>>>>> functionality that FlinkRunner currently has. As a next step, I 
>>>>>>>>>> would add a
>>>>>>>>>> JobServiceRunner that can submit Pipelines to a JobService.
>>>>>>>>>>
>>>>>>>>>> For testing, I would probably add functionality that allows
>>>>>>>>>> spinning up a JobService in-process with the JobServiceRunner. I can
>>>>>>>>>> imagine for testing we could even eventually use something like:
>>>>>>>>>> "--runner=JobServiceRunner", "--streaming=true",
>>>>>>>>>> "--jobService=FlinkRunnerJobService".
>>>>>>>>>>
>>>>>>>>>> Once all of this is done, we only need the python component that
>>>>>>>>>> talks to the JobService to submit a pipeline.
>>>>>>>>>>
>>>>>>>>>> What do you think about the plan?
>>>>>>>>>>
>>>>>>>>>> Btw, I feel that the thing currently called Runner, i.e.
>>>>>>>>>> FlinkRunner will go way in the long run and we will have 
>>>>>>>>>> FlinkJobService,
>>>>>>>>>> SparkJobService and whatnot, what do you think?
>>>>>>>>>>
>>>>>>>>>> Aljoscha
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 9. Feb 2018, at 01:31, Ben Sidhom <sid...@google.com> wrote:
>>>>>>>>>>
>>>>>>>>>> Hey all,
>>>>>>>>>>
>>>>>>>>>> We're working on getting the portability framework plumbed
>>>>>>>>>> through the Flink runner. The first iteration will likely only 
>>>>>>>>>> support
>>>>>>>>>> batch and will be limited in its deployment flexibility, but 
>>>>>>>>>> hopefully it
>>>>>>>>>> shouldn't be too painful to expand this.
>>>>>>>>>>
>>>>>>>>>> We have the start of a tracking doc here:
>>>>>>>>>> https://s.apache.org/portable-beam-on-flink.
>>>>>>>>>>
>>>>>>>>>> We've documented the general deployment strategy here:
>>>>>>>>>> https://s.apache.org/portable-flink-runner-overview.
>>>>>>>>>>
>>>>>>>>>> Feel free to provide comments on the docs or jump in on any of
>>>>>>>>>> the referenced bugs.
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> -Ben
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> -Ben
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>

Reply via email to