[
https://issues.apache.org/jira/browse/FLINK-34631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Mark Lidenberg updated FLINK-34631:
-----------------------------------
Summary: Possible memory leak in pyflink when using state with RocksDB
(was: Memory leak in pyflink when using state with RocksDB )
> Possible memory leak in pyflink when using state with RocksDB
> --------------------------------------------------------------
>
> Key: FLINK-34631
> URL: https://issues.apache.org/jira/browse/FLINK-34631
> Project: Flink
> Issue Type: Bug
> Components: API / Python
> Affects Versions: 1.18.1
> Reporter: Mark Lidenberg
> Priority: Major
>
> I have had issues with memory constantly growing on pyflink task managers,
> which should not really happen when we use RocksDB as our state backend.
> I've made a simple example to demonstrate the memory leak. In this example I
> update state with 1mb value for each key and then sleep for 1 second. Memory
> growth 1mb per second until the process crashes, as if the state value stays
> in memory. Same thing happens if I send 100 messages per second with 10kb
> each. Memory keeps growing indefinitely. I've also tested `MapState`, it's
> the same.
>
> ```python
> import time
> import psutil
> from pyflink.common import Types
> from pyflink.datastream import (
> EmbeddedRocksDBStateBackend,
> KeyedProcessFunction,
> RuntimeContext,
> StreamExecutionEnvironment,
> )
> from pyflink.datastream.state import ValueStateDescriptor
> class Processor(KeyedProcessFunction):
> def open(self, runtime_context: RuntimeContext):
> self.state = runtime_context.get_state(
> ValueStateDescriptor(
> name="my_state",
> value_type_info=Types.STRING(),
> )
> )
> def process_element(self, value: int, ctx: KeyedProcessFunction.Context):
> print("Processing", value, "Memory: ",
> round(psutil.Process().memory_info().rss / 1024 / 1024, 2), "MB")
> # Processing 1 Memory: 171.25 MB -> Processing 2 Memory: 172.12 MB
> -> ... grows 1mb per second, which should not happen because we use RocksDB
> as state backend
> self.state.update("a" * 1_000_000) # 1 mb of data per second
> time.sleep(1.0)
> if _{_}name{_}_ == "_{_}main{_}_":
> # - Create flink environment
> environment =
> StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)
> # - Make sure to use RocksDB as state backend
> environment.set_state_backend(EmbeddedRocksDBStateBackend())
> # - Create pipeline
> (
> environment.from_collection(
> collection=list(range(3600 * 12)),
> )
> .key_by(lambda value: value)
> .process(Processor())
> )
> # - Execute pipeline
> environment.execute(job_name="memory_leak_test")
> ```
--
This message was sent by Atlassian Jira
(v8.20.10#820010)