It was not a portable Flink runner.

Thanks all for the thoughts, I will create JIRAs, as suggested, with my
findings and send them out

On Tue, Apr 30, 2019 at 11:34 AM Reuven Lax <[email protected]> wrote:

> Jozef did you use the portable Flink runner or the old one?
>
> Reuven
>
> On Tue, Apr 30, 2019 at 1:03 AM Robert Bradshaw <[email protected]>
> wrote:
>
>> Thanks for starting this investigation. As mentioned, most of the work
>> to date has been on feature parity, not performance parity, but we're
>> at the point that the latter should be tackled as well. Even if there
>> is a slight overhead (and there's talk about integrating more deeply
>> with the Flume DAG that could elide even that) I'd expect it should be
>> nowhere near the 3x that you're seeing. Aside from the timer issue,
>> sounds like the cloning via coders is is a huge drag that needs to be
>> addressed. I wonder if this is one of those cases where using the
>> portability framework could be a performance win (specifically, no
>> cloning would happen between operators of fused stages, and the
>> cloning between operators could be on the raw bytes[] (if needed at
>> all, because we know they wouldn't be mutated).
>>
>> On Tue, Apr 30, 2019 at 12:31 AM Kenneth Knowles <[email protected]> wrote:
>> >
>> > Specifically, a lot of shared code assumes that repeatedly setting a
>> timer is nearly free / the same cost as determining whether or not to set
>> the timer. ReduceFnRunner has been refactored in a way so it would be very
>> easy to set the GC timer once per window that occurs in a bundle, but
>> there's probably some underlying inefficiency around why this isn't cheap
>> that would be a bigger win.
>> >
>> > Kenn
>> >
>> > On Mon, Apr 29, 2019 at 10:05 AM Reuven Lax <[email protected]> wrote:
>> >>
>> >> I think the short answer is that folks working on the BeamFlink runner
>> have mostly been focused on getting everything working, and so have not dug
>> into this performance too deeply. I suspect that there is low-hanging fruit
>> to optimize as a result.
>> >>
>> >> You're right that ReduceFnRunner schedules a timer for each element. I
>> think this code dates back to before Beam; on Dataflow timers are
>> identified by tag, so this simply overwrites the existing timer which is
>> very cheap in Dataflow. If it is not cheap on Flink, this might be
>> something to optimize.
>> >>
>> >> Reuven
>> >>
>> >> On Mon, Apr 29, 2019 at 3:48 AM Jozef Vilcek <[email protected]>
>> wrote:
>> >>>
>> >>> Hello,
>> >>>
>> >>> I am interested in any knowledge or thoughts on what should be / is
>> an overhead of running Beam pipelines instead of pipelines written on "bare
>> runner". Is this something which is being tested or investigated by
>> community? Is there a consensus in what bounds should the overhead
>> typically be? I realise this is very runner specific, but certain things
>> are imposed also by SDK model itself.
>> >>>
>> >>> I tested simple streaming pipeline on Flink vs Beam-Flink and found
>> very noticeable differences. I want to stress out, it was not a performance
>> test. Job does following:
>> >>>
>> >>> Read Kafka -> Deserialize to Proto -> Filter deserialisation errors
>> -> Reshuffle -> Report counter.inc() to metrics for throughput
>> >>>
>> >>> Both jobs had same configuration and same state backed with same
>> checkpointing strategy. What I noticed from few simple test runs:
>> >>>
>> >>> * first run on Flink 1.5.0 from CPU profiles on one worker I have
>> found out that ~50% time was spend either on removing timers from
>> HeapInternalTimerService or in java.io.ByteArrayOutputStream from
>> CoderUtils.clone()
>> >>>
>> >>> * problem with timer delete was addressed by FLINK-9423. I have
>> retested on Flink 1.7.2 and there was not much time is spend in timer
>> delete now, but root cause was not removed. It still remains that timers
>> are frequently registered and removed ( I believe from
>> ReduceFnRunner.scheduleGarbageCollectionTimer() in which case it is called
>> per processed element? )  which is noticeable in GC activity, Heap and
>> State ...
>> >>>
>> >>> * in Flink I use FileSystem state backed which keeps state in memory
>> CopyOnWriteStateTable which after some time is full of PaneInfo objects.
>> Maybe they come from PaneInfoTracker activity
>> >>>
>> >>> * Coder clone is painfull. Pure Flink job does copy between operators
>> too, in my case it is via Kryo.copy() but this is not noticeable in CPU
>> profile. Kryo.copy() does copy on object level not boject -> bytes ->
>> object which is cheaper
>> >>>
>> >>> Overall, my observation is that pure Flink can be roughly 3x faster.
>> >>>
>> >>> I do not know what I am trying to achieve here :) Probably just start
>> a discussion and collect thoughts and other experiences on the cost of
>> running some data processing on Beam and particular runner.
>> >>>
>>
>

Reply via email to