I think the short answer is that folks working on the BeamFlink runner have
mostly been focused on getting everything working, and so have not dug into
this performance too deeply. I suspect that there is low-hanging fruit to
optimize as a result.

You're right that ReduceFnRunner schedules a timer for each element. I
think this code dates back to before Beam; on Dataflow timers are
identified by tag, so this simply overwrites the existing timer which is
very cheap in Dataflow. If it is not cheap on Flink, this might be
something to optimize.

Reuven

On Mon, Apr 29, 2019 at 3:48 AM Jozef Vilcek <jozo.vil...@gmail.com> wrote:

> Hello,
>
> I am interested in any knowledge or thoughts on what should be / is an
> overhead of running Beam pipelines instead of pipelines written on "bare
> runner". Is this something which is being tested or investigated by
> community? Is there a consensus in what bounds should the overhead
> typically be? I realise this is very runner specific, but certain things
> are imposed also by SDK model itself.
>
> I tested simple streaming pipeline on Flink vs Beam-Flink and found very
> noticeable differences. I want to stress out, it was not a performance
> test. Job does following:
>
> Read Kafka -> Deserialize to Proto -> Filter deserialisation errors ->
> Reshuffle -> Report counter.inc() to metrics for throughput
>
> Both jobs had same configuration and same state backed with same
> checkpointing strategy. What I noticed from few simple test runs:
>
> * first run on Flink 1.5.0 from CPU profiles on one worker I have found
> out that ~50% time was spend either on removing timers
> from HeapInternalTimerService or in java.io.ByteArrayOutputStream from
> CoderUtils.clone()
>
> * problem with timer delete was addressed by FLINK-9423. I have retested
> on Flink 1.7.2 and there was not much time is spend in timer delete now,
> but root cause was not removed. It still remains that timers are frequently
> registered and removed ( I believe
> from ReduceFnRunner.scheduleGarbageCollectionTimer() in which case it is
> called per processed element? )  which is noticeable in GC activity, Heap
> and State ...
>
> * in Flink I use FileSystem state backed which keeps state in memory
> CopyOnWriteStateTable which after some time is full of PaneInfo objects.
> Maybe they come from PaneInfoTracker activity
>
> * Coder clone is painfull. Pure Flink job does copy between operators too,
> in my case it is via Kryo.copy() but this is not noticeable in CPU profile.
> Kryo.copy() does copy on object level not boject -> bytes -> object which
> is cheaper
>
> Overall, my observation is that pure Flink can be roughly 3x faster.
>
> I do not know what I am trying to achieve here :) Probably just start a
> discussion and collect thoughts and other experiences on the cost of
> running some data processing on Beam and particular runner.
>
>

Reply via email to