[
https://issues.apache.org/jira/browse/BEAM-7639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ismaël Mejía updated BEAM-7639:
-------------------------------
Status: Open (was: Triage Needed)
> Intermittent empty accumulator values in extractOutput of Combine.perKey on
> Dataflow
> ------------------------------------------------------------------------------------
>
> Key: BEAM-7639
> URL: https://issues.apache.org/jira/browse/BEAM-7639
> Project: Beam
> Issue Type: Bug
> Components: runner-dataflow
> Affects Versions: 2.10.0
> Reporter: Piotr Szczepanik
> Priority: Major
>
> We are using Spotify’s scio 0.7.2 which is built with Apache Beam 2.10.0 on
> Google Dataflow in streaming mode with fixed windows.
> Using the above versions we have observed a strange and unfortunately
> intermittent behaviour with Combine.perKey transform used to achieve a reduce
> operation, e.g. emitting the max value per key or the value based on the last
> element with the key in window.
> Such reductions are implemented in scio as Combine.CombineFn with the
> accumulator created as an empty ArrayList and extractOutput doing the actual
> reduction and returning the output value.
> This works well when at trigger time combine accumulator is non empty and I
> understand that there should be no triggers fired if there are no input
> messages processed in the given window for a given key. Otherwise if it is
> fired I think we may assume there was at least one event with a given key in
> a given window and it should be in accumulator.
> The transform is part of a job consisting of 40-50 transforms that is
> consuming messages from two different PubSub topics, transforming, windowing,
> combining them and then joining to emit output messages to a PubSub topic.
> Messages in input topics are pulled at 5-300 per second rate depending on a
> time of day.
> We did run this job split into 3 separate jobs for 6+ months and observed no
> similar problems but it was not optimal as each of those jobs were using
> 10-30% of worker CPU. It is after we combined those separate jobs into one
> *we have started observing exceptions* in the step where the specific
> transform was used and for which the direct cause is an empty accumulator at
> the time when window triggers are fired. Those errors happened on
> subscriptions that had 1 hour retention set and the CPUs were quite stressed
> then.
> We tried changing machine type to larger ones “-n2” -> “-n4” and an hour of
> retention was consumed without errors. After another try with retention of 3
> hours that was successful we tried consuming 6 hours of retention which then
> again failed.
> We have found similar issues at scio's bugtracker:
> [https://github.com/spotify/scio/issues/778]
> [https://github.com/spotify/scio/issues/1620]
> The workaround proposed there is to use a custom `aggregateByKey` transform
> which is also based on Combine.perKey but uses a `zero` value which is output
> when the accumulator is empty. We used this workaround but it is not optimal
> as there are some cases that there is no good default value, e.g. last/first
> message in window.
> While searching through Beam's jira I have found an issue that may be similar
> to ours: https://issues.apache.org/jira/browse/BEAM-7614
> I assume that this issues happen when the CPU, memory or both are stressed in
> a catch up phase after starting a job with some retention to consume.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)