[
https://issues.apache.org/jira/browse/FLINK-34631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Mark Lidenberg closed FLINK-34631.
----------------------------------
Resolution: Not A Problem
After further investigation and testing, I've discovered that apparently the
issue was due to a mistake in my test setup. The provided example works fine,
on appr. 2gb of memory in my setup it starts dropping, so everything is
functioning as expected.
I'll look deeper into our production memory problems.
I am closing this ticket for now.
> Possible memory leak in pyflink when using state with RocksDB
> --------------------------------------------------------------
>
> Key: FLINK-34631
> URL: https://issues.apache.org/jira/browse/FLINK-34631
> Project: Flink
> Issue Type: Bug
> Components: API / Python
> Affects Versions: 1.18.1
> Reporter: Mark Lidenberg
> Priority: Major
>
> I have had issues with memory constantly growing in our production pipelines
> on pyflink task managers until they crash, which should not really happen
> when we use RocksDB as our state backend.
> I've made a simple example to demonstrate the possible memory leak. In this
> example I update state with 1mb value for each key and then sleep for 1
> second. Memory growth 1mb per second until the process crashes, as if the
> state value stays in memory. Same thing happens if I send 100 messages per
> second with 10kb each. I've also tested `MapState`, it's the same.
> Either there is a memory leak, or my setup with default RocksDB configuration
> just doesn't fit the example.
>
> ```python
> import time
> import psutil
> from pyflink.common import Types
> from pyflink.datastream import (
> EmbeddedRocksDBStateBackend,
> KeyedProcessFunction,
> RuntimeContext,
> StreamExecutionEnvironment,
> )
> from pyflink.datastream.state import ValueStateDescriptor
> class Processor(KeyedProcessFunction):
> def open(self, runtime_context: RuntimeContext):
> self.state = runtime_context.get_state(
> ValueStateDescriptor(
> name="my_state",
> value_type_info=Types.STRING(),
> )
> )
> def process_element(self, value: int, ctx: KeyedProcessFunction.Context):
> print("Processing", value, "Memory: ",
> round(psutil.Process().memory_info().rss / 1024 / 1024, 2), "MB")
> # Processing 1 Memory: 171.25 MB -> Processing 2 Memory: 172.12 MB
> -> ... grows 1mb per second, which should not happen because we use RocksDB
> as state backend
> self.state.update("a" * 1_000_000) # 1 mb of data per second
> time.sleep(1.0)
> if {_}{{_}}name{{_}}{_} == "{_}{{_}}main{{_}}{_}":
> # - Create flink environment
> environment =
> StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)
> # - Make sure to use RocksDB as state backend
> environment.set_state_backend(EmbeddedRocksDBStateBackend())
> # - Create pipeline
> (
> environment.from_collection(
> collection=list(range(3600 * 12)),
> )
> .key_by(lambda value: value)
> .process(Processor())
> )
> # - Execute pipeline
> environment.execute(job_name="memory_leak_test")
> ```
--
This message was sent by Atlassian Jira
(v8.20.10#820010)