It was not a portable Flink runner. Thanks all for the thoughts, I will create JIRAs, as suggested, with my findings and send them out
On Tue, Apr 30, 2019 at 11:34 AM Reuven Lax <[email protected]> wrote: > Jozef did you use the portable Flink runner or the old one? > > Reuven > > On Tue, Apr 30, 2019 at 1:03 AM Robert Bradshaw <[email protected]> > wrote: > >> Thanks for starting this investigation. As mentioned, most of the work >> to date has been on feature parity, not performance parity, but we're >> at the point that the latter should be tackled as well. Even if there >> is a slight overhead (and there's talk about integrating more deeply >> with the Flume DAG that could elide even that) I'd expect it should be >> nowhere near the 3x that you're seeing. Aside from the timer issue, >> sounds like the cloning via coders is is a huge drag that needs to be >> addressed. I wonder if this is one of those cases where using the >> portability framework could be a performance win (specifically, no >> cloning would happen between operators of fused stages, and the >> cloning between operators could be on the raw bytes[] (if needed at >> all, because we know they wouldn't be mutated). >> >> On Tue, Apr 30, 2019 at 12:31 AM Kenneth Knowles <[email protected]> wrote: >> > >> > Specifically, a lot of shared code assumes that repeatedly setting a >> timer is nearly free / the same cost as determining whether or not to set >> the timer. ReduceFnRunner has been refactored in a way so it would be very >> easy to set the GC timer once per window that occurs in a bundle, but >> there's probably some underlying inefficiency around why this isn't cheap >> that would be a bigger win. >> > >> > Kenn >> > >> > On Mon, Apr 29, 2019 at 10:05 AM Reuven Lax <[email protected]> wrote: >> >> >> >> I think the short answer is that folks working on the BeamFlink runner >> have mostly been focused on getting everything working, and so have not dug >> into this performance too deeply. I suspect that there is low-hanging fruit >> to optimize as a result. >> >> >> >> You're right that ReduceFnRunner schedules a timer for each element. I >> think this code dates back to before Beam; on Dataflow timers are >> identified by tag, so this simply overwrites the existing timer which is >> very cheap in Dataflow. If it is not cheap on Flink, this might be >> something to optimize. >> >> >> >> Reuven >> >> >> >> On Mon, Apr 29, 2019 at 3:48 AM Jozef Vilcek <[email protected]> >> wrote: >> >>> >> >>> Hello, >> >>> >> >>> I am interested in any knowledge or thoughts on what should be / is >> an overhead of running Beam pipelines instead of pipelines written on "bare >> runner". Is this something which is being tested or investigated by >> community? Is there a consensus in what bounds should the overhead >> typically be? I realise this is very runner specific, but certain things >> are imposed also by SDK model itself. >> >>> >> >>> I tested simple streaming pipeline on Flink vs Beam-Flink and found >> very noticeable differences. I want to stress out, it was not a performance >> test. Job does following: >> >>> >> >>> Read Kafka -> Deserialize to Proto -> Filter deserialisation errors >> -> Reshuffle -> Report counter.inc() to metrics for throughput >> >>> >> >>> Both jobs had same configuration and same state backed with same >> checkpointing strategy. What I noticed from few simple test runs: >> >>> >> >>> * first run on Flink 1.5.0 from CPU profiles on one worker I have >> found out that ~50% time was spend either on removing timers from >> HeapInternalTimerService or in java.io.ByteArrayOutputStream from >> CoderUtils.clone() >> >>> >> >>> * problem with timer delete was addressed by FLINK-9423. I have >> retested on Flink 1.7.2 and there was not much time is spend in timer >> delete now, but root cause was not removed. It still remains that timers >> are frequently registered and removed ( I believe from >> ReduceFnRunner.scheduleGarbageCollectionTimer() in which case it is called >> per processed element? ) which is noticeable in GC activity, Heap and >> State ... >> >>> >> >>> * in Flink I use FileSystem state backed which keeps state in memory >> CopyOnWriteStateTable which after some time is full of PaneInfo objects. >> Maybe they come from PaneInfoTracker activity >> >>> >> >>> * Coder clone is painfull. Pure Flink job does copy between operators >> too, in my case it is via Kryo.copy() but this is not noticeable in CPU >> profile. Kryo.copy() does copy on object level not boject -> bytes -> >> object which is cheaper >> >>> >> >>> Overall, my observation is that pure Flink can be roughly 3x faster. >> >>> >> >>> I do not know what I am trying to achieve here :) Probably just start >> a discussion and collect thoughts and other experiences on the cost of >> running some data processing on Beam and particular runner. >> >>> >> >
