On Thu, May 2, 2019 at 3:41 PM Maximilian Michels <[email protected]
<mailto:[email protected]>> wrote:
Thanks for the JIRA issues Jozef!
> So the feature in Flink is operator chaining and Flink per
default initiate copy of input elements. In case of Beam coders copy
seems to be more noticable than native Flink.
Copying between chained operators can be turned off in the
FlinkPipelineOptions (if you know what you're doing).
Yes, I know that it can be instracted to reuse objects (if you are
referring to this). I am just not sure I want to open this door in
general :)
But it is interesting to learn, that with portability, this will be
turned On per default. Quite important finding imho.
Beam coders should
not be slower than Flink's. They are simple wrapped. It seems Kryo is
simply slower which we could fix by providing more type hints to
Flink.
I am not sure what are you referring to here. What do you mean Kryo is
simply slower ... Beam Kryo or Flink Kryo or?
-Max
On 02.05.19 13:15, Robert Bradshaw wrote:
> Thanks for filing those.
>
> As for how not doing a copy is "safe," it's not really. Beam
simply
> asserts that you MUST NOT mutate your inputs (and direct runners,
> which are used during testing, do perform extra copies and
checks to
> catch violations of this requirement).
>
> On Thu, May 2, 2019 at 1:02 PM Jozef Vilcek
<[email protected] <mailto:[email protected]>> wrote:
>>
>> I have created
>> https://issues.apache.org/jira/browse/BEAM-7204
>> https://issues.apache.org/jira/browse/BEAM-7206
>>
>> to track these topics further
>>
>> On Wed, May 1, 2019 at 1:24 PM Jozef Vilcek
<[email protected] <mailto:[email protected]>> wrote:
>>>
>>>
>>>
>>> On Tue, Apr 30, 2019 at 5:42 PM Kenneth Knowles
<[email protected] <mailto:[email protected]>> wrote:
>>>>
>>>>
>>>>
>>>> On Tue, Apr 30, 2019, 07:05 Reuven Lax <[email protected]
<mailto:[email protected]>> wrote:
>>>>>
>>>>> In that case, Robert's point is quite valid. The old Flink
runner I believe had no knowledge of fusion, which was known to make
it extremely slow. A lot of work went into making the portable
runner fusion aware, so we don't need to round trip through coders
on every ParDo.
>>>>
>>>>
>>>> The old Flink runner got fusion for free, since Flink does it.
The new fusion in portability is because fusing the runner side of
portability steps does not achieve real fusion
>>>
>>>
>>> Aha, I see. So the feature in Flink is operator chaining and
Flink per default initiate copy of input elements. In case of Beam
coders copy seems to be more noticable than native Flink.
>>> So do I get it right that in portable runner scenario, you do
similar chaining via this "fusion of stages"? Curious here... how is
it different from chaining so runner can be sure that not doing copy
is "safe" with respect to user defined functions and their behaviour
over inputs?
>>>
>>>>>
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Tue, Apr 30, 2019 at 6:58 AM Jozef Vilcek
<[email protected] <mailto:[email protected]>> wrote:
>>>>>>
>>>>>> It was not a portable Flink runner.
>>>>>>
>>>>>> Thanks all for the thoughts, I will create JIRAs, as
suggested, with my findings and send them out
>>>>>>
>>>>>> On Tue, Apr 30, 2019 at 11:34 AM Reuven Lax
<[email protected] <mailto:[email protected]>> wrote:
>>>>>>>
>>>>>>> Jozef did you use the portable Flink runner or the old one?
>>>>>>>
>>>>>>> Reuven
>>>>>>>
>>>>>>> On Tue, Apr 30, 2019 at 1:03 AM Robert Bradshaw
<[email protected] <mailto:[email protected]>> wrote:
>>>>>>>>
>>>>>>>> Thanks for starting this investigation. As mentioned, most
of the work
>>>>>>>> to date has been on feature parity, not performance
parity, but we're
>>>>>>>> at the point that the latter should be tackled as well.
Even if there
>>>>>>>> is a slight overhead (and there's talk about integrating
more deeply
>>>>>>>> with the Flume DAG that could elide even that) I'd expect
it should be
>>>>>>>> nowhere near the 3x that you're seeing. Aside from the
timer issue,
>>>>>>>> sounds like the cloning via coders is is a huge drag that
needs to be
>>>>>>>> addressed. I wonder if this is one of those cases where
using the
>>>>>>>> portability framework could be a performance win
(specifically, no
>>>>>>>> cloning would happen between operators of fused stages,
and the
>>>>>>>> cloning between operators could be on the raw bytes[] (if
needed at
>>>>>>>> all, because we know they wouldn't be mutated).
>>>>>>>>
>>>>>>>> On Tue, Apr 30, 2019 at 12:31 AM Kenneth Knowles
<[email protected] <mailto:[email protected]>> wrote:
>>>>>>>>>
>>>>>>>>> Specifically, a lot of shared code assumes that
repeatedly setting a timer is nearly free / the same cost as
determining whether or not to set the timer. ReduceFnRunner has been
refactored in a way so it would be very easy to set the GC timer
once per window that occurs in a bundle, but there's probably some
underlying inefficiency around why this isn't cheap that would be a
bigger win.
>>>>>>>>>
>>>>>>>>> Kenn
>>>>>>>>>
>>>>>>>>> On Mon, Apr 29, 2019 at 10:05 AM Reuven Lax
<[email protected] <mailto:[email protected]>> wrote:
>>>>>>>>>>
>>>>>>>>>> I think the short answer is that folks working on the
BeamFlink runner have mostly been focused on getting everything
working, and so have not dug into this performance too deeply. I
suspect that there is low-hanging fruit to optimize as a result.
>>>>>>>>>>
>>>>>>>>>> You're right that ReduceFnRunner schedules a timer for
each element. I think this code dates back to before Beam; on
Dataflow timers are identified by tag, so this simply overwrites the
existing timer which is very cheap in Dataflow. If it is not cheap
on Flink, this might be something to optimize.
>>>>>>>>>>
>>>>>>>>>> Reuven
>>>>>>>>>>
>>>>>>>>>> On Mon, Apr 29, 2019 at 3:48 AM Jozef Vilcek
<[email protected] <mailto:[email protected]>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hello,
>>>>>>>>>>>
>>>>>>>>>>> I am interested in any knowledge or thoughts on what
should be / is an overhead of running Beam pipelines instead of
pipelines written on "bare runner". Is this something which is being
tested or investigated by community? Is there a consensus in what
bounds should the overhead typically be? I realise this is very
runner specific, but certain things are imposed also by SDK model
itself.
>>>>>>>>>>>
>>>>>>>>>>> I tested simple streaming pipeline on Flink vs
Beam-Flink and found very noticeable differences. I want to stress
out, it was not a performance test. Job does following:
>>>>>>>>>>>
>>>>>>>>>>> Read Kafka -> Deserialize to Proto -> Filter
deserialisation errors -> Reshuffle -> Report counter.inc() to
metrics for throughput
>>>>>>>>>>>
>>>>>>>>>>> Both jobs had same configuration and same state backed
with same checkpointing strategy. What I noticed from few simple
test runs:
>>>>>>>>>>>
>>>>>>>>>>> * first run on Flink 1.5.0 from CPU profiles on one
worker I have found out that ~50% time was spend either on removing
timers from HeapInternalTimerService or in
java.io.ByteArrayOutputStream from CoderUtils.clone()
>>>>>>>>>>>
>>>>>>>>>>> * problem with timer delete was addressed by
FLINK-9423. I have retested on Flink 1.7.2 and there was not much
time is spend in timer delete now, but root cause was not removed.
It still remains that timers are frequently registered and removed (
I believe from ReduceFnRunner.scheduleGarbageCollectionTimer() in
which case it is called per processed element? ) which is
noticeable in GC activity, Heap and State ...
>>>>>>>>>>>
>>>>>>>>>>> * in Flink I use FileSystem state backed which keeps
state in memory CopyOnWriteStateTable which after some time is full
of PaneInfo objects. Maybe they come from PaneInfoTracker activity
>>>>>>>>>>>
>>>>>>>>>>> * Coder clone is painfull. Pure Flink job does copy
between operators too, in my case it is via Kryo.copy() but this is
not noticeable in CPU profile. Kryo.copy() does copy on object level
not boject -> bytes -> object which is cheaper
>>>>>>>>>>>
>>>>>>>>>>> Overall, my observation is that pure Flink can be
roughly 3x faster.
>>>>>>>>>>>
>>>>>>>>>>> I do not know what I am trying to achieve here :)
Probably just start a discussion and collect thoughts and other
experiences on the cost of running some data processing on Beam and
particular runner.
>>>>>>>>>>>