I have created https://issues.apache.org/jira/browse/BEAM-7204 https://issues.apache.org/jira/browse/BEAM-7206
to track these topics further On Wed, May 1, 2019 at 1:24 PM Jozef Vilcek <[email protected]> wrote: > > > On Tue, Apr 30, 2019 at 5:42 PM Kenneth Knowles <[email protected]> wrote: > >> >> >> On Tue, Apr 30, 2019, 07:05 Reuven Lax <[email protected]> wrote: >> >>> In that case, Robert's point is quite valid. The old Flink runner I >>> believe had no knowledge of fusion, which was known to make it extremely >>> slow. A lot of work went into making the portable runner fusion aware, so >>> we don't need to round trip through coders on every ParDo. >>> >> >> The old Flink runner got fusion for free, since Flink does it. The new >> fusion in portability is because fusing the runner side of portability >> steps does not achieve real fusion >> > > Aha, I see. So the feature in Flink is operator chaining and Flink per > default initiate copy of input elements. In case of Beam coders copy seems > to be more noticable than native Flink. > So do I get it right that in portable runner scenario, you do similar > chaining via this "fusion of stages"? Curious here... how is it different > from chaining so runner can be sure that not doing copy is "safe" with > respect to user defined functions and their behaviour over inputs? > > >> >>> Reuven >>> >>> On Tue, Apr 30, 2019 at 6:58 AM Jozef Vilcek <[email protected]> >>> wrote: >>> >>>> It was not a portable Flink runner. >>>> >>>> Thanks all for the thoughts, I will create JIRAs, as suggested, with my >>>> findings and send them out >>>> >>>> On Tue, Apr 30, 2019 at 11:34 AM Reuven Lax <[email protected]> wrote: >>>> >>>>> Jozef did you use the portable Flink runner or the old one? >>>>> >>>>> Reuven >>>>> >>>>> On Tue, Apr 30, 2019 at 1:03 AM Robert Bradshaw <[email protected]> >>>>> wrote: >>>>> >>>>>> Thanks for starting this investigation. As mentioned, most of the work >>>>>> to date has been on feature parity, not performance parity, but we're >>>>>> at the point that the latter should be tackled as well. Even if there >>>>>> is a slight overhead (and there's talk about integrating more deeply >>>>>> with the Flume DAG that could elide even that) I'd expect it should be >>>>>> nowhere near the 3x that you're seeing. Aside from the timer issue, >>>>>> sounds like the cloning via coders is is a huge drag that needs to be >>>>>> addressed. I wonder if this is one of those cases where using the >>>>>> portability framework could be a performance win (specifically, no >>>>>> cloning would happen between operators of fused stages, and the >>>>>> cloning between operators could be on the raw bytes[] (if needed at >>>>>> all, because we know they wouldn't be mutated). >>>>>> >>>>>> On Tue, Apr 30, 2019 at 12:31 AM Kenneth Knowles <[email protected]> >>>>>> wrote: >>>>>> > >>>>>> > Specifically, a lot of shared code assumes that repeatedly setting >>>>>> a timer is nearly free / the same cost as determining whether or not to >>>>>> set >>>>>> the timer. ReduceFnRunner has been refactored in a way so it would be >>>>>> very >>>>>> easy to set the GC timer once per window that occurs in a bundle, but >>>>>> there's probably some underlying inefficiency around why this isn't cheap >>>>>> that would be a bigger win. >>>>>> > >>>>>> > Kenn >>>>>> > >>>>>> > On Mon, Apr 29, 2019 at 10:05 AM Reuven Lax <[email protected]> >>>>>> wrote: >>>>>> >> >>>>>> >> I think the short answer is that folks working on the BeamFlink >>>>>> runner have mostly been focused on getting everything working, and so >>>>>> have >>>>>> not dug into this performance too deeply. I suspect that there is >>>>>> low-hanging fruit to optimize as a result. >>>>>> >> >>>>>> >> You're right that ReduceFnRunner schedules a timer for each >>>>>> element. I think this code dates back to before Beam; on Dataflow timers >>>>>> are identified by tag, so this simply overwrites the existing timer which >>>>>> is very cheap in Dataflow. If it is not cheap on Flink, this might be >>>>>> something to optimize. >>>>>> >> >>>>>> >> Reuven >>>>>> >> >>>>>> >> On Mon, Apr 29, 2019 at 3:48 AM Jozef Vilcek < >>>>>> [email protected]> wrote: >>>>>> >>> >>>>>> >>> Hello, >>>>>> >>> >>>>>> >>> I am interested in any knowledge or thoughts on what should be / >>>>>> is an overhead of running Beam pipelines instead of pipelines written on >>>>>> "bare runner". Is this something which is being tested or investigated by >>>>>> community? Is there a consensus in what bounds should the overhead >>>>>> typically be? I realise this is very runner specific, but certain things >>>>>> are imposed also by SDK model itself. >>>>>> >>> >>>>>> >>> I tested simple streaming pipeline on Flink vs Beam-Flink and >>>>>> found very noticeable differences. I want to stress out, it was not a >>>>>> performance test. Job does following: >>>>>> >>> >>>>>> >>> Read Kafka -> Deserialize to Proto -> Filter deserialisation >>>>>> errors -> Reshuffle -> Report counter.inc() to metrics for throughput >>>>>> >>> >>>>>> >>> Both jobs had same configuration and same state backed with same >>>>>> checkpointing strategy. What I noticed from few simple test runs: >>>>>> >>> >>>>>> >>> * first run on Flink 1.5.0 from CPU profiles on one worker I have >>>>>> found out that ~50% time was spend either on removing timers from >>>>>> HeapInternalTimerService or in java.io.ByteArrayOutputStream from >>>>>> CoderUtils.clone() >>>>>> >>> >>>>>> >>> * problem with timer delete was addressed by FLINK-9423. I have >>>>>> retested on Flink 1.7.2 and there was not much time is spend in timer >>>>>> delete now, but root cause was not removed. It still remains that timers >>>>>> are frequently registered and removed ( I believe from >>>>>> ReduceFnRunner.scheduleGarbageCollectionTimer() in which case it is >>>>>> called >>>>>> per processed element? ) which is noticeable in GC activity, Heap and >>>>>> State ... >>>>>> >>> >>>>>> >>> * in Flink I use FileSystem state backed which keeps state in >>>>>> memory CopyOnWriteStateTable which after some time is full of PaneInfo >>>>>> objects. Maybe they come from PaneInfoTracker activity >>>>>> >>> >>>>>> >>> * Coder clone is painfull. Pure Flink job does copy between >>>>>> operators too, in my case it is via Kryo.copy() but this is not >>>>>> noticeable >>>>>> in CPU profile. Kryo.copy() does copy on object level not boject -> bytes >>>>>> -> object which is cheaper >>>>>> >>> >>>>>> >>> Overall, my observation is that pure Flink can be roughly 3x >>>>>> faster. >>>>>> >>> >>>>>> >>> I do not know what I am trying to achieve here :) Probably just >>>>>> start a discussion and collect thoughts and other experiences on the cost >>>>>> of running some data processing on Beam and particular runner. >>>>>> >>> >>>>>> >>>>>
