On Thu, May 2, 2019 at 3:41 PM Maximilian Michels <m...@apache.org> wrote:
> Thanks for the JIRA issues Jozef! > > > So the feature in Flink is operator chaining and Flink per default > initiate copy of input elements. In case of Beam coders copy seems to be > more noticable than native Flink. > > Copying between chained operators can be turned off in the > FlinkPipelineOptions (if you know what you're doing). Yes, I know that it can be instracted to reuse objects (if you are referring to this). I am just not sure I want to open this door in general :) But it is interesting to learn, that with portability, this will be turned On per default. Quite important finding imho. > Beam coders should > not be slower than Flink's. They are simple wrapped. It seems Kryo is > simply slower which we could fix by providing more type hints to Flink. > I am not sure what are you referring to here. What do you mean Kryo is simply slower ... Beam Kryo or Flink Kryo or? > -Max > > On 02.05.19 13:15, Robert Bradshaw wrote: > > Thanks for filing those. > > > > As for how not doing a copy is "safe," it's not really. Beam simply > > asserts that you MUST NOT mutate your inputs (and direct runners, > > which are used during testing, do perform extra copies and checks to > > catch violations of this requirement). > > > > On Thu, May 2, 2019 at 1:02 PM Jozef Vilcek <jozo.vil...@gmail.com> > wrote: > >> > >> I have created > >> https://issues.apache.org/jira/browse/BEAM-7204 > >> https://issues.apache.org/jira/browse/BEAM-7206 > >> > >> to track these topics further > >> > >> On Wed, May 1, 2019 at 1:24 PM Jozef Vilcek <jozo.vil...@gmail.com> > wrote: > >>> > >>> > >>> > >>> On Tue, Apr 30, 2019 at 5:42 PM Kenneth Knowles <k...@apache.org> > wrote: > >>>> > >>>> > >>>> > >>>> On Tue, Apr 30, 2019, 07:05 Reuven Lax <re...@google.com> wrote: > >>>>> > >>>>> In that case, Robert's point is quite valid. The old Flink runner I > believe had no knowledge of fusion, which was known to make it extremely > slow. A lot of work went into making the portable runner fusion aware, so > we don't need to round trip through coders on every ParDo. > >>>> > >>>> > >>>> The old Flink runner got fusion for free, since Flink does it. The > new fusion in portability is because fusing the runner side of portability > steps does not achieve real fusion > >>> > >>> > >>> Aha, I see. So the feature in Flink is operator chaining and Flink per > default initiate copy of input elements. In case of Beam coders copy seems > to be more noticable than native Flink. > >>> So do I get it right that in portable runner scenario, you do similar > chaining via this "fusion of stages"? Curious here... how is it different > from chaining so runner can be sure that not doing copy is "safe" with > respect to user defined functions and their behaviour over inputs? > >>> > >>>>> > >>>>> > >>>>> Reuven > >>>>> > >>>>> On Tue, Apr 30, 2019 at 6:58 AM Jozef Vilcek <jozo.vil...@gmail.com> > wrote: > >>>>>> > >>>>>> It was not a portable Flink runner. > >>>>>> > >>>>>> Thanks all for the thoughts, I will create JIRAs, as suggested, > with my findings and send them out > >>>>>> > >>>>>> On Tue, Apr 30, 2019 at 11:34 AM Reuven Lax <re...@google.com> > wrote: > >>>>>>> > >>>>>>> Jozef did you use the portable Flink runner or the old one? > >>>>>>> > >>>>>>> Reuven > >>>>>>> > >>>>>>> On Tue, Apr 30, 2019 at 1:03 AM Robert Bradshaw < > rober...@google.com> wrote: > >>>>>>>> > >>>>>>>> Thanks for starting this investigation. As mentioned, most of the > work > >>>>>>>> to date has been on feature parity, not performance parity, but > we're > >>>>>>>> at the point that the latter should be tackled as well. Even if > there > >>>>>>>> is a slight overhead (and there's talk about integrating more > deeply > >>>>>>>> with the Flume DAG that could elide even that) I'd expect it > should be > >>>>>>>> nowhere near the 3x that you're seeing. Aside from the timer > issue, > >>>>>>>> sounds like the cloning via coders is is a huge drag that needs > to be > >>>>>>>> addressed. I wonder if this is one of those cases where using the > >>>>>>>> portability framework could be a performance win (specifically, no > >>>>>>>> cloning would happen between operators of fused stages, and the > >>>>>>>> cloning between operators could be on the raw bytes[] (if needed > at > >>>>>>>> all, because we know they wouldn't be mutated). > >>>>>>>> > >>>>>>>> On Tue, Apr 30, 2019 at 12:31 AM Kenneth Knowles <k...@apache.org> > wrote: > >>>>>>>>> > >>>>>>>>> Specifically, a lot of shared code assumes that repeatedly > setting a timer is nearly free / the same cost as determining whether or > not to set the timer. ReduceFnRunner has been refactored in a way so it > would be very easy to set the GC timer once per window that occurs in a > bundle, but there's probably some underlying inefficiency around why this > isn't cheap that would be a bigger win. > >>>>>>>>> > >>>>>>>>> Kenn > >>>>>>>>> > >>>>>>>>> On Mon, Apr 29, 2019 at 10:05 AM Reuven Lax <re...@google.com> > wrote: > >>>>>>>>>> > >>>>>>>>>> I think the short answer is that folks working on the BeamFlink > runner have mostly been focused on getting everything working, and so have > not dug into this performance too deeply. I suspect that there is > low-hanging fruit to optimize as a result. > >>>>>>>>>> > >>>>>>>>>> You're right that ReduceFnRunner schedules a timer for each > element. I think this code dates back to before Beam; on Dataflow timers > are identified by tag, so this simply overwrites the existing timer which > is very cheap in Dataflow. If it is not cheap on Flink, this might be > something to optimize. > >>>>>>>>>> > >>>>>>>>>> Reuven > >>>>>>>>>> > >>>>>>>>>> On Mon, Apr 29, 2019 at 3:48 AM Jozef Vilcek < > jozo.vil...@gmail.com> wrote: > >>>>>>>>>>> > >>>>>>>>>>> Hello, > >>>>>>>>>>> > >>>>>>>>>>> I am interested in any knowledge or thoughts on what should be > / is an overhead of running Beam pipelines instead of pipelines written on > "bare runner". Is this something which is being tested or investigated by > community? Is there a consensus in what bounds should the overhead > typically be? I realise this is very runner specific, but certain things > are imposed also by SDK model itself. > >>>>>>>>>>> > >>>>>>>>>>> I tested simple streaming pipeline on Flink vs Beam-Flink and > found very noticeable differences. I want to stress out, it was not a > performance test. Job does following: > >>>>>>>>>>> > >>>>>>>>>>> Read Kafka -> Deserialize to Proto -> Filter deserialisation > errors -> Reshuffle -> Report counter.inc() to metrics for throughput > >>>>>>>>>>> > >>>>>>>>>>> Both jobs had same configuration and same state backed with > same checkpointing strategy. What I noticed from few simple test runs: > >>>>>>>>>>> > >>>>>>>>>>> * first run on Flink 1.5.0 from CPU profiles on one worker I > have found out that ~50% time was spend either on removing timers from > HeapInternalTimerService or in java.io.ByteArrayOutputStream from > CoderUtils.clone() > >>>>>>>>>>> > >>>>>>>>>>> * problem with timer delete was addressed by FLINK-9423. I > have retested on Flink 1.7.2 and there was not much time is spend in timer > delete now, but root cause was not removed. It still remains that timers > are frequently registered and removed ( I believe from > ReduceFnRunner.scheduleGarbageCollectionTimer() in which case it is called > per processed element? ) which is noticeable in GC activity, Heap and > State ... > >>>>>>>>>>> > >>>>>>>>>>> * in Flink I use FileSystem state backed which keeps state in > memory CopyOnWriteStateTable which after some time is full of PaneInfo > objects. Maybe they come from PaneInfoTracker activity > >>>>>>>>>>> > >>>>>>>>>>> * Coder clone is painfull. Pure Flink job does copy between > operators too, in my case it is via Kryo.copy() but this is not noticeable > in CPU profile. Kryo.copy() does copy on object level not boject -> bytes > -> object which is cheaper > >>>>>>>>>>> > >>>>>>>>>>> Overall, my observation is that pure Flink can be roughly 3x > faster. > >>>>>>>>>>> > >>>>>>>>>>> I do not know what I am trying to achieve here :) Probably > just start a discussion and collect thoughts and other experiences on the > cost of running some data processing on Beam and particular runner. > >>>>>>>>>>> >