On Thu, May 2, 2019 at 3:41 PM Maximilian Michels <m...@apache.org> wrote:

> Thanks for the JIRA issues Jozef!
>
> > So the feature in Flink is operator chaining and Flink per default
> initiate copy of input elements. In case of Beam coders copy seems to be
> more noticable than native Flink.
>
> Copying between chained operators can be turned off in the
> FlinkPipelineOptions (if you know what you're doing).


Yes, I know that it can be instracted to reuse objects (if you are
referring to this). I am just not sure I want to open this door in general
:)
But it is interesting to learn, that with portability, this will be turned
On per default. Quite important finding imho.


> Beam coders should
> not be slower than Flink's. They are simple wrapped. It seems Kryo is
> simply slower which we could fix by providing more type hints to Flink.
>

I am not sure what are you referring to here. What do you mean Kryo is
simply slower ... Beam Kryo or Flink Kryo or?


> -Max
>
> On 02.05.19 13:15, Robert Bradshaw wrote:
> > Thanks for filing those.
> >
> > As for how not doing a copy is "safe," it's not really. Beam simply
> > asserts that you MUST NOT mutate your inputs (and direct runners,
> > which are used during testing, do perform extra copies and checks to
> > catch violations of this requirement).
> >
> > On Thu, May 2, 2019 at 1:02 PM Jozef Vilcek <jozo.vil...@gmail.com>
> wrote:
> >>
> >> I have created
> >> https://issues.apache.org/jira/browse/BEAM-7204
> >> https://issues.apache.org/jira/browse/BEAM-7206
> >>
> >> to track these topics further
> >>
> >> On Wed, May 1, 2019 at 1:24 PM Jozef Vilcek <jozo.vil...@gmail.com>
> wrote:
> >>>
> >>>
> >>>
> >>> On Tue, Apr 30, 2019 at 5:42 PM Kenneth Knowles <k...@apache.org>
> wrote:
> >>>>
> >>>>
> >>>>
> >>>> On Tue, Apr 30, 2019, 07:05 Reuven Lax <re...@google.com> wrote:
> >>>>>
> >>>>> In that case, Robert's point is quite valid. The old Flink runner I
> believe had no knowledge of fusion, which was known to make it extremely
> slow. A lot of work went into making the portable runner fusion aware, so
> we don't need to round trip through coders on every ParDo.
> >>>>
> >>>>
> >>>> The old Flink runner got fusion for free, since Flink does it. The
> new fusion in portability is because fusing the runner side of portability
> steps does not achieve real fusion
> >>>
> >>>
> >>> Aha, I see. So the feature in Flink is operator chaining and Flink per
> default initiate copy of input elements. In case of Beam coders copy seems
> to be more noticable than native Flink.
> >>> So do I get it right that in portable runner scenario, you do similar
> chaining via this "fusion of stages"? Curious here... how is it different
> from chaining so runner can be sure that not doing copy is "safe" with
> respect to user defined functions and their behaviour over inputs?
> >>>
> >>>>>
> >>>>>
> >>>>> Reuven
> >>>>>
> >>>>> On Tue, Apr 30, 2019 at 6:58 AM Jozef Vilcek <jozo.vil...@gmail.com>
> wrote:
> >>>>>>
> >>>>>> It was not a portable Flink runner.
> >>>>>>
> >>>>>> Thanks all for the thoughts, I will create JIRAs, as suggested,
> with my findings and send them out
> >>>>>>
> >>>>>> On Tue, Apr 30, 2019 at 11:34 AM Reuven Lax <re...@google.com>
> wrote:
> >>>>>>>
> >>>>>>> Jozef did you use the portable Flink runner or the old one?
> >>>>>>>
> >>>>>>> Reuven
> >>>>>>>
> >>>>>>> On Tue, Apr 30, 2019 at 1:03 AM Robert Bradshaw <
> rober...@google.com> wrote:
> >>>>>>>>
> >>>>>>>> Thanks for starting this investigation. As mentioned, most of the
> work
> >>>>>>>> to date has been on feature parity, not performance parity, but
> we're
> >>>>>>>> at the point that the latter should be tackled as well. Even if
> there
> >>>>>>>> is a slight overhead (and there's talk about integrating more
> deeply
> >>>>>>>> with the Flume DAG that could elide even that) I'd expect it
> should be
> >>>>>>>> nowhere near the 3x that you're seeing. Aside from the timer
> issue,
> >>>>>>>> sounds like the cloning via coders is is a huge drag that needs
> to be
> >>>>>>>> addressed. I wonder if this is one of those cases where using the
> >>>>>>>> portability framework could be a performance win (specifically, no
> >>>>>>>> cloning would happen between operators of fused stages, and the
> >>>>>>>> cloning between operators could be on the raw bytes[] (if needed
> at
> >>>>>>>> all, because we know they wouldn't be mutated).
> >>>>>>>>
> >>>>>>>> On Tue, Apr 30, 2019 at 12:31 AM Kenneth Knowles <k...@apache.org>
> wrote:
> >>>>>>>>>
> >>>>>>>>> Specifically, a lot of shared code assumes that repeatedly
> setting a timer is nearly free / the same cost as determining whether or
> not to set the timer. ReduceFnRunner has been refactored in a way so it
> would be very easy to set the GC timer once per window that occurs in a
> bundle, but there's probably some underlying inefficiency around why this
> isn't cheap that would be a bigger win.
> >>>>>>>>>
> >>>>>>>>> Kenn
> >>>>>>>>>
> >>>>>>>>> On Mon, Apr 29, 2019 at 10:05 AM Reuven Lax <re...@google.com>
> wrote:
> >>>>>>>>>>
> >>>>>>>>>> I think the short answer is that folks working on the BeamFlink
> runner have mostly been focused on getting everything working, and so have
> not dug into this performance too deeply. I suspect that there is
> low-hanging fruit to optimize as a result.
> >>>>>>>>>>
> >>>>>>>>>> You're right that ReduceFnRunner schedules a timer for each
> element. I think this code dates back to before Beam; on Dataflow timers
> are identified by tag, so this simply overwrites the existing timer which
> is very cheap in Dataflow. If it is not cheap on Flink, this might be
> something to optimize.
> >>>>>>>>>>
> >>>>>>>>>> Reuven
> >>>>>>>>>>
> >>>>>>>>>> On Mon, Apr 29, 2019 at 3:48 AM Jozef Vilcek <
> jozo.vil...@gmail.com> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> Hello,
> >>>>>>>>>>>
> >>>>>>>>>>> I am interested in any knowledge or thoughts on what should be
> / is an overhead of running Beam pipelines instead of pipelines written on
> "bare runner". Is this something which is being tested or investigated by
> community? Is there a consensus in what bounds should the overhead
> typically be? I realise this is very runner specific, but certain things
> are imposed also by SDK model itself.
> >>>>>>>>>>>
> >>>>>>>>>>> I tested simple streaming pipeline on Flink vs Beam-Flink and
> found very noticeable differences. I want to stress out, it was not a
> performance test. Job does following:
> >>>>>>>>>>>
> >>>>>>>>>>> Read Kafka -> Deserialize to Proto -> Filter deserialisation
> errors -> Reshuffle -> Report counter.inc() to metrics for throughput
> >>>>>>>>>>>
> >>>>>>>>>>> Both jobs had same configuration and same state backed with
> same checkpointing strategy. What I noticed from few simple test runs:
> >>>>>>>>>>>
> >>>>>>>>>>> * first run on Flink 1.5.0 from CPU profiles on one worker I
> have found out that ~50% time was spend either on removing timers from
> HeapInternalTimerService or in java.io.ByteArrayOutputStream from
> CoderUtils.clone()
> >>>>>>>>>>>
> >>>>>>>>>>> * problem with timer delete was addressed by FLINK-9423. I
> have retested on Flink 1.7.2 and there was not much time is spend in timer
> delete now, but root cause was not removed. It still remains that timers
> are frequently registered and removed ( I believe from
> ReduceFnRunner.scheduleGarbageCollectionTimer() in which case it is called
> per processed element? )  which is noticeable in GC activity, Heap and
> State ...
> >>>>>>>>>>>
> >>>>>>>>>>> * in Flink I use FileSystem state backed which keeps state in
> memory CopyOnWriteStateTable which after some time is full of PaneInfo
> objects. Maybe they come from PaneInfoTracker activity
> >>>>>>>>>>>
> >>>>>>>>>>> * Coder clone is painfull. Pure Flink job does copy between
> operators too, in my case it is via Kryo.copy() but this is not noticeable
> in CPU profile. Kryo.copy() does copy on object level not boject -> bytes
> -> object which is cheaper
> >>>>>>>>>>>
> >>>>>>>>>>> Overall, my observation is that pure Flink can be roughly 3x
> faster.
> >>>>>>>>>>>
> >>>>>>>>>>> I do not know what I am trying to achieve here :) Probably
> just start a discussion and collect thoughts and other experiences on the
> cost of running some data processing on Beam and particular runner.
> >>>>>>>>>>>
>

Reply via email to