Piotr Szczepanik created BEAM-7639:
--------------------------------------

             Summary: Intermittent empty accumulator values in extractOutput of 
Combine.perKey on Dataflow
                 Key: BEAM-7639
                 URL: https://issues.apache.org/jira/browse/BEAM-7639
             Project: Beam
          Issue Type: Bug
          Components: runner-dataflow
    Affects Versions: 2.10.0
            Reporter: Piotr Szczepanik


We are using Spotify’s scio 0.7.2 which is built with Apache Beam 2.10.0 on 
Google Dataflow in streaming mode with fixed windows.

Using the above versions we have observed a strange and unfortunately 
intermittent behaviour with Combine.perKey transform used to achieve a reduce 
operation, e.g. emitting the max value per key or the value based on the last 
element with the key in window.

Such reductions are implemented in scio as Combine.CombineFn with the 
accumulator created as an empty ArrayList and extractOutput doing the actual 
reduction and returning the output value.

This works well when at trigger time combine accumulator is non empty and I 
understand that there should be no triggers fired if there are no input 
messages processed in the given window for a given key. Otherwise if it is 
fired I think we may assume there was at least one event with a given key in a 
given window and it should be in accumulator.

The transform is part of a job consisting of 40-50 transforms that is consuming 
messages from two different PubSub topics, transforming, windowing, combining 
them and then joining to emit output messages to a PubSub topic. Messages in 
input topics are pulled at 5-300 per second rate depending on a time of day.

We did run this job split into 3 separate jobs for 6+ months and observed no 
similar problems but it was not optimal as each of those jobs were using 10-30% 
of worker CPU. It is after we combined those separate jobs into one *we have 
started observing exceptions* in the step where the specific transform was used 
and for which the direct cause is an empty accumulator at the time when window 
triggers are fired. Those errors happened on subscriptions that had 1 hour 
retention set and the CPUs were quite stressed then.

We tried changing machine type to larger ones “-n2” -> “-n4” and an hour of 
retention was consumed without errors. After another try with retention of 3 
hours that was successful we tried consuming 6 hours of retention which then 
again failed.

We have found similar issues at scio's bugtracker:

[https://github.com/spotify/scio/issues/778]

[https://github.com/spotify/scio/issues/1620]

The workaround proposed there is to use a custom `aggregateByKey` transform 
which is also based on Combine.perKey but uses a `zero` value which is output 
when the accumulator is empty. We used this workaround but it is not optimal as 
there are some cases that there is no good default value, e.g. last/first 
message in window.

While searching through Beam's jira I have found an issue that may be similar 
to ours: https://issues.apache.org/jira/browse/BEAM-7614

I assume that this issues happen when the CPU, memory or both are stressed in a 
catch up phase after starting a job with some retention to consume.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to