[
https://issues.apache.org/jira/browse/FLINK-34631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Mark Lidenberg updated FLINK-34631:
-----------------------------------
Description:
I have had issues with memory constantly growing in our production pipelines on
pyflink task managers until they crash, which should not really happen when we
use RocksDB as our state backend.
I've made a simple example to demonstrate the possible memory leak. In this
example I update state with 1mb value for each key and then sleep for 1 second.
Memory growth 1mb per second until the process crashes, as if the state value
stays in memory. Same thing happens if I send 100 messages per second with 10kb
each. I've also tested `MapState`, it's the same.
Either there is a memory leak, or my setup with default RocksDB configuration
just doesn't fit the example.
```python
import time
import psutil
from pyflink.common import Types
from pyflink.datastream import (
EmbeddedRocksDBStateBackend,
KeyedProcessFunction,
RuntimeContext,
StreamExecutionEnvironment,
)
from pyflink.datastream.state import ValueStateDescriptor
class Processor(KeyedProcessFunction):
def open(self, runtime_context: RuntimeContext):
self.state = runtime_context.get_state(
ValueStateDescriptor(
name="my_state",
value_type_info=Types.STRING(),
)
)
def process_element(self, value: int, ctx: KeyedProcessFunction.Context):
print("Processing", value, "Memory: ",
round(psutil.Process().memory_info().rss / 1024 / 1024, 2), "MB")
# Processing 1 Memory: 171.25 MB -> Processing 2 Memory: 172.12 MB ->
... grows 1mb per second, which should not happen because we use RocksDB as
state backend
self.state.update("a" * 1_000_000) # 1 mb of data per second
time.sleep(1.0)
if {_}{{_}}name{{_}}{_} == "{_}{{_}}main{{_}}{_}":
# - Create flink environment
environment =
StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)
# - Make sure to use RocksDB as state backend
environment.set_state_backend(EmbeddedRocksDBStateBackend())
# - Create pipeline
(
environment.from_collection(
collection=list(range(3600 * 12)),
)
.key_by(lambda value: value)
.process(Processor())
)
# - Execute pipeline
environment.execute(job_name="memory_leak_test")
```
was:
I have had issues with memory constantly growing on pyflink task managers,
which should not really happen when we use RocksDB as our state backend.
I've made a simple example to demonstrate the memory leak. In this example I
update state with 1mb value for each key and then sleep for 1 second. Memory
growth 1mb per second until the process crashes, as if the state value stays in
memory. Same thing happens if I send 100 messages per second with 10kb each.
Memory keeps growing indefinitely. I've also tested `MapState`, it's the same.
```python
import time
import psutil
from pyflink.common import Types
from pyflink.datastream import (
EmbeddedRocksDBStateBackend,
KeyedProcessFunction,
RuntimeContext,
StreamExecutionEnvironment,
)
from pyflink.datastream.state import ValueStateDescriptor
class Processor(KeyedProcessFunction):
def open(self, runtime_context: RuntimeContext):
self.state = runtime_context.get_state(
ValueStateDescriptor(
name="my_state",
value_type_info=Types.STRING(),
)
)
def process_element(self, value: int, ctx: KeyedProcessFunction.Context):
print("Processing", value, "Memory: ",
round(psutil.Process().memory_info().rss / 1024 / 1024, 2), "MB")
# Processing 1 Memory: 171.25 MB -> Processing 2 Memory: 172.12 MB ->
... grows 1mb per second, which should not happen because we use RocksDB as
state backend
self.state.update("a" * 1_000_000) # 1 mb of data per second
time.sleep(1.0)
if _{_}name{_}_ == "_{_}main{_}_":
# - Create flink environment
environment =
StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)
# - Make sure to use RocksDB as state backend
environment.set_state_backend(EmbeddedRocksDBStateBackend())
# - Create pipeline
(
environment.from_collection(
collection=list(range(3600 * 12)),
)
.key_by(lambda value: value)
.process(Processor())
)
# - Execute pipeline
environment.execute(job_name="memory_leak_test")
```
> Possible memory leak in pyflink when using state with RocksDB
> --------------------------------------------------------------
>
> Key: FLINK-34631
> URL: https://issues.apache.org/jira/browse/FLINK-34631
> Project: Flink
> Issue Type: Bug
> Components: API / Python
> Affects Versions: 1.18.1
> Reporter: Mark Lidenberg
> Priority: Major
>
> I have had issues with memory constantly growing in our production pipelines
> on pyflink task managers until they crash, which should not really happen
> when we use RocksDB as our state backend.
> I've made a simple example to demonstrate the possible memory leak. In this
> example I update state with 1mb value for each key and then sleep for 1
> second. Memory growth 1mb per second until the process crashes, as if the
> state value stays in memory. Same thing happens if I send 100 messages per
> second with 10kb each. I've also tested `MapState`, it's the same.
> Either there is a memory leak, or my setup with default RocksDB configuration
> just doesn't fit the example.
>
> ```python
> import time
> import psutil
> from pyflink.common import Types
> from pyflink.datastream import (
> EmbeddedRocksDBStateBackend,
> KeyedProcessFunction,
> RuntimeContext,
> StreamExecutionEnvironment,
> )
> from pyflink.datastream.state import ValueStateDescriptor
> class Processor(KeyedProcessFunction):
> def open(self, runtime_context: RuntimeContext):
> self.state = runtime_context.get_state(
> ValueStateDescriptor(
> name="my_state",
> value_type_info=Types.STRING(),
> )
> )
> def process_element(self, value: int, ctx: KeyedProcessFunction.Context):
> print("Processing", value, "Memory: ",
> round(psutil.Process().memory_info().rss / 1024 / 1024, 2), "MB")
> # Processing 1 Memory: 171.25 MB -> Processing 2 Memory: 172.12 MB
> -> ... grows 1mb per second, which should not happen because we use RocksDB
> as state backend
> self.state.update("a" * 1_000_000) # 1 mb of data per second
> time.sleep(1.0)
> if {_}{{_}}name{{_}}{_} == "{_}{{_}}main{{_}}{_}":
> # - Create flink environment
> environment =
> StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)
> # - Make sure to use RocksDB as state backend
> environment.set_state_backend(EmbeddedRocksDBStateBackend())
> # - Create pipeline
> (
> environment.from_collection(
> collection=list(range(3600 * 12)),
> )
> .key_by(lambda value: value)
> .process(Processor())
> )
> # - Execute pipeline
> environment.execute(job_name="memory_leak_test")
> ```
--
This message was sent by Atlassian Jira
(v8.20.10#820010)