Thanks for the JIRA issues Jozef!

So the feature in Flink is operator chaining and Flink per default initiate 
copy of input elements. In case of Beam coders copy seems to be more noticable 
than native Flink.

Copying between chained operators can be turned off in the FlinkPipelineOptions (if you know what you're doing). Beam coders should not be slower than Flink's. They are simple wrapped. It seems Kryo is simply slower which we could fix by providing more type hints to Flink.

-Max

On 02.05.19 13:15, Robert Bradshaw wrote:
Thanks for filing those.

As for how not doing a copy is "safe," it's not really. Beam simply
asserts that you MUST NOT mutate your inputs (and direct runners,
which are used during testing, do perform extra copies and checks to
catch violations of this requirement).

On Thu, May 2, 2019 at 1:02 PM Jozef Vilcek <[email protected]> wrote:

I have created
https://issues.apache.org/jira/browse/BEAM-7204
https://issues.apache.org/jira/browse/BEAM-7206

to track these topics further

On Wed, May 1, 2019 at 1:24 PM Jozef Vilcek <[email protected]> wrote:



On Tue, Apr 30, 2019 at 5:42 PM Kenneth Knowles <[email protected]> wrote:



On Tue, Apr 30, 2019, 07:05 Reuven Lax <[email protected]> wrote:

In that case, Robert's point is quite valid. The old Flink runner I believe had 
no knowledge of fusion, which was known to make it extremely slow. A lot of 
work went into making the portable runner fusion aware, so we don't need to 
round trip through coders on every ParDo.


The old Flink runner got fusion for free, since Flink does it. The new fusion 
in portability is because fusing the runner side of portability steps does not 
achieve real fusion


Aha, I see. So the feature in Flink is operator chaining and Flink per default 
initiate copy of input elements. In case of Beam coders copy seems to be more 
noticable than native Flink.
So do I get it right that in portable runner scenario, you do similar chaining via this 
"fusion of stages"? Curious here... how is it different from chaining so runner can be 
sure that not doing copy is "safe" with respect to user defined functions and their 
behaviour over inputs?



Reuven

On Tue, Apr 30, 2019 at 6:58 AM Jozef Vilcek <[email protected]> wrote:

It was not a portable Flink runner.

Thanks all for the thoughts, I will create JIRAs, as suggested, with my 
findings and send them out

On Tue, Apr 30, 2019 at 11:34 AM Reuven Lax <[email protected]> wrote:

Jozef did you use the portable Flink runner or the old one?

Reuven

On Tue, Apr 30, 2019 at 1:03 AM Robert Bradshaw <[email protected]> wrote:

Thanks for starting this investigation. As mentioned, most of the work
to date has been on feature parity, not performance parity, but we're
at the point that the latter should be tackled as well. Even if there
is a slight overhead (and there's talk about integrating more deeply
with the Flume DAG that could elide even that) I'd expect it should be
nowhere near the 3x that you're seeing. Aside from the timer issue,
sounds like the cloning via coders is is a huge drag that needs to be
addressed. I wonder if this is one of those cases where using the
portability framework could be a performance win (specifically, no
cloning would happen between operators of fused stages, and the
cloning between operators could be on the raw bytes[] (if needed at
all, because we know they wouldn't be mutated).

On Tue, Apr 30, 2019 at 12:31 AM Kenneth Knowles <[email protected]> wrote:

Specifically, a lot of shared code assumes that repeatedly setting a timer is 
nearly free / the same cost as determining whether or not to set the timer. 
ReduceFnRunner has been refactored in a way so it would be very easy to set the 
GC timer once per window that occurs in a bundle, but there's probably some 
underlying inefficiency around why this isn't cheap that would be a bigger win.

Kenn

On Mon, Apr 29, 2019 at 10:05 AM Reuven Lax <[email protected]> wrote:

I think the short answer is that folks working on the BeamFlink runner have 
mostly been focused on getting everything working, and so have not dug into 
this performance too deeply. I suspect that there is low-hanging fruit to 
optimize as a result.

You're right that ReduceFnRunner schedules a timer for each element. I think 
this code dates back to before Beam; on Dataflow timers are identified by tag, 
so this simply overwrites the existing timer which is very cheap in Dataflow. 
If it is not cheap on Flink, this might be something to optimize.

Reuven

On Mon, Apr 29, 2019 at 3:48 AM Jozef Vilcek <[email protected]> wrote:

Hello,

I am interested in any knowledge or thoughts on what should be / is an overhead of 
running Beam pipelines instead of pipelines written on "bare runner". Is this 
something which is being tested or investigated by community? Is there a consensus in 
what bounds should the overhead typically be? I realise this is very runner specific, but 
certain things are imposed also by SDK model itself.

I tested simple streaming pipeline on Flink vs Beam-Flink and found very 
noticeable differences. I want to stress out, it was not a performance test. 
Job does following:

Read Kafka -> Deserialize to Proto -> Filter deserialisation errors -> Reshuffle 
-> Report counter.inc() to metrics for throughput

Both jobs had same configuration and same state backed with same checkpointing 
strategy. What I noticed from few simple test runs:

* first run on Flink 1.5.0 from CPU profiles on one worker I have found out 
that ~50% time was spend either on removing timers from 
HeapInternalTimerService or in java.io.ByteArrayOutputStream from 
CoderUtils.clone()

* problem with timer delete was addressed by FLINK-9423. I have retested on 
Flink 1.7.2 and there was not much time is spend in timer delete now, but root 
cause was not removed. It still remains that timers are frequently registered 
and removed ( I believe from ReduceFnRunner.scheduleGarbageCollectionTimer() in 
which case it is called per processed element? )  which is noticeable in GC 
activity, Heap and State ...

* in Flink I use FileSystem state backed which keeps state in memory 
CopyOnWriteStateTable which after some time is full of PaneInfo objects. Maybe 
they come from PaneInfoTracker activity

* Coder clone is painfull. Pure Flink job does copy between operators too, in my case 
it is via Kryo.copy() but this is not noticeable in CPU profile. Kryo.copy() does 
copy on object level not boject -> bytes -> object which is cheaper

Overall, my observation is that pure Flink can be roughly 3x faster.

I do not know what I am trying to achieve here :) Probably just start a 
discussion and collect thoughts and other experiences on the cost of running 
some data processing on Beam and particular runner.

Reply via email to