Piotr Szczepanik created BEAM-7639:
--------------------------------------
Summary: Intermittent empty accumulator values in extractOutput of
Combine.perKey on Dataflow
Key: BEAM-7639
URL: https://issues.apache.org/jira/browse/BEAM-7639
Project: Beam
Issue Type: Bug
Components: runner-dataflow
Affects Versions: 2.10.0
Reporter: Piotr Szczepanik
We are using Spotify’s scio 0.7.2 which is built with Apache Beam 2.10.0 on
Google Dataflow in streaming mode with fixed windows.
Using the above versions we have observed a strange and unfortunately
intermittent behaviour with Combine.perKey transform used to achieve a reduce
operation, e.g. emitting the max value per key or the value based on the last
element with the key in window.
Such reductions are implemented in scio as Combine.CombineFn with the
accumulator created as an empty ArrayList and extractOutput doing the actual
reduction and returning the output value.
This works well when at trigger time combine accumulator is non empty and I
understand that there should be no triggers fired if there are no input
messages processed in the given window for a given key. Otherwise if it is
fired I think we may assume there was at least one event with a given key in a
given window and it should be in accumulator.
The transform is part of a job consisting of 40-50 transforms that is consuming
messages from two different PubSub topics, transforming, windowing, combining
them and then joining to emit output messages to a PubSub topic. Messages in
input topics are pulled at 5-300 per second rate depending on a time of day.
We did run this job split into 3 separate jobs for 6+ months and observed no
similar problems but it was not optimal as each of those jobs were using 10-30%
of worker CPU. It is after we combined those separate jobs into one *we have
started observing exceptions* in the step where the specific transform was used
and for which the direct cause is an empty accumulator at the time when window
triggers are fired. Those errors happened on subscriptions that had 1 hour
retention set and the CPUs were quite stressed then.
We tried changing machine type to larger ones “-n2” -> “-n4” and an hour of
retention was consumed without errors. After another try with retention of 3
hours that was successful we tried consuming 6 hours of retention which then
again failed.
We have found similar issues at scio's bugtracker:
[https://github.com/spotify/scio/issues/778]
[https://github.com/spotify/scio/issues/1620]
The workaround proposed there is to use a custom `aggregateByKey` transform
which is also based on Combine.perKey but uses a `zero` value which is output
when the accumulator is empty. We used this workaround but it is not optimal as
there are some cases that there is no good default value, e.g. last/first
message in window.
While searching through Beam's jira I have found an issue that may be similar
to ours: https://issues.apache.org/jira/browse/BEAM-7614
I assume that this issues happen when the CPU, memory or both are stressed in a
catch up phase after starting a job with some retention to consume.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)