[ https://issues.apache.org/jira/browse/BEAM-5500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16630515#comment-16630515 ]
Thomas Weise commented on BEAM-5500: ------------------------------------ The pipeline runs in streaming mode. With the Flink runner, currently every record is processed as a separate bundle. The pipeline above has multiple executable stages. The leak will be: number_of_records * number_of_stages * 300. So if we process 1 million records and have 2 stages, we have leaked 600MB of memory. > Portable python sdk worker leaks memory in streaming mode > --------------------------------------------------------- > > Key: BEAM-5500 > URL: https://issues.apache.org/jira/browse/BEAM-5500 > Project: Beam > Issue Type: Bug > Components: sdk-py-harness > Reporter: Micah Wylde > Assignee: Robert Bradshaw > Priority: Major > Attachments: chart.png > > > When using the portable python sdk with flink in streaming mode, we see that > the python worker processes steadily increase memory usage until they are OOM > killed. This behavior is consistent across various kinds of streaming > pipelines, including those with fixed windows and global windows. > A simple wordcount-like pipeline demonstrates the issue for us (note this is > run on the [Lyft beam fork|https://github.com/lyft/beam/], which provides > access to kinesis as a portable streaming source): > {code:java} > counts = (p > | 'Kinesis' >> FlinkKinesisInput().with_stream('test-stream') > | 'decode' >> beam.FlatMap(decode) # parses from json into python objs > | 'pair_with_one' >> beam.Map(lambda x: (x["event_name"], 1)) > | 'window' >> beam.WindowInto(window.GlobalWindows(), > trigger=AfterProcessingTime(15 * 1000), > accumulation_mode=AccumulationMode.DISCARDING) > | 'group' >> beam.GroupByKey() > | 'count' >> beam.Map(count_ones) > | beam.Map(lambda x: logging.warn("count: %s", str(x)) or x)) > {code} > When run, we see a steady increase in memory usage in the sdk_worker process. > Using [heapy|http://guppy-pe.sourceforge.net/#Heapy] I've analyzed the memory > usage over time and found that it's largely dicts and strings (see attached > chart). > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)