[ 
https://issues.apache.org/jira/browse/FLINK-34631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mark Lidenberg updated FLINK-34631:
-----------------------------------
    Description: 
I have had issues with memory constantly growing in our production pipelines on 
pyflink task managers until they crash, which should not really happen when we 
use RocksDB as our state backend. 

I've made a simple example to demonstrate the possible memory leak. In this 
example I update state with 1mb value for each key and then sleep for 1 second. 
Memory growth 1mb per second until the process crashes, as if the state value 
stays in memory. Same thing happens if I send 100 messages per second with 10kb 
each. I've also tested `MapState`, it's the same. 

Either there is a memory leak, or my setup with default RocksDB configuration 
just doesn't fit the example. 

 

```python 
import time

import psutil

from pyflink.common import Types
from pyflink.datastream import (
    EmbeddedRocksDBStateBackend,
    KeyedProcessFunction,
    RuntimeContext,
    StreamExecutionEnvironment,
)
from pyflink.datastream.state import ValueStateDescriptor

class Processor(KeyedProcessFunction):
    def open(self, runtime_context: RuntimeContext):
        self.state = runtime_context.get_state(
            ValueStateDescriptor(
                name="my_state",
                value_type_info=Types.STRING(),
            )
        )

    def process_element(self, value: int, ctx: KeyedProcessFunction.Context):
        print("Processing", value, "Memory: ", 
round(psutil.Process().memory_info().rss / 1024 / 1024, 2), "MB")

        # Processing 1 Memory:  171.25 MB -> Processing 2 Memory:  172.12 MB -> 
... grows 1mb per second, which should not happen because we use RocksDB as 
state backend
        self.state.update("a" * 1_000_000)  # 1 mb of data per second
        time.sleep(1.0)

if {_}{{_}}name{{_}}{_} == "{_}{{_}}main{{_}}{_}":
    # - Create flink environment

    environment = 
StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)

    # - Make sure to use RocksDB as state backend

    environment.set_state_backend(EmbeddedRocksDBStateBackend())

    # - Create pipeline

    (
        environment.from_collection(
            collection=list(range(3600 * 12)),
        )
        .key_by(lambda value: value)
        .process(Processor())
    )

    # - Execute pipeline

    environment.execute(job_name="memory_leak_test")

```

  was:
I have had issues with memory constantly growing on pyflink task managers, 
which should not really happen when we use RocksDB as our state backend. 

I've made a simple example to demonstrate the memory leak. In this example I 
update state with 1mb value for each key and then sleep for 1 second. Memory 
growth 1mb per second until the process crashes, as if the state value stays in 
memory. Same thing happens if I send 100 messages per second with 10kb each. 
Memory keeps growing indefinitely. I've also tested `MapState`, it's the same. 

 

```python 
import time

import psutil

from pyflink.common import Types
from pyflink.datastream import (
    EmbeddedRocksDBStateBackend,
    KeyedProcessFunction,
    RuntimeContext,
    StreamExecutionEnvironment,
)
from pyflink.datastream.state import ValueStateDescriptor

class Processor(KeyedProcessFunction):
    def open(self, runtime_context: RuntimeContext):
        self.state = runtime_context.get_state(
            ValueStateDescriptor(
                name="my_state",
                value_type_info=Types.STRING(),
            )
        )

    def process_element(self, value: int, ctx: KeyedProcessFunction.Context):
        print("Processing", value, "Memory: ", 
round(psutil.Process().memory_info().rss / 1024 / 1024, 2), "MB")

        # Processing 1 Memory:  171.25 MB -> Processing 2 Memory:  172.12 MB -> 
... grows 1mb per second, which should not happen because we use RocksDB as 
state backend
        self.state.update("a" * 1_000_000)  # 1 mb of data per second
        time.sleep(1.0)

if _{_}name{_}_ == "_{_}main{_}_":
    # - Create flink environment

    environment = 
StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)

    # - Make sure to use RocksDB as state backend

    environment.set_state_backend(EmbeddedRocksDBStateBackend())

    # - Create pipeline

    (
        environment.from_collection(
            collection=list(range(3600 * 12)),
        )
        .key_by(lambda value: value)
        .process(Processor())
    )

    # - Execute pipeline

    environment.execute(job_name="memory_leak_test")

```


> Possible memory leak in pyflink when using state with RocksDB 
> --------------------------------------------------------------
>
>                 Key: FLINK-34631
>                 URL: https://issues.apache.org/jira/browse/FLINK-34631
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>    Affects Versions: 1.18.1
>            Reporter: Mark Lidenberg
>            Priority: Major
>
> I have had issues with memory constantly growing in our production pipelines 
> on pyflink task managers until they crash, which should not really happen 
> when we use RocksDB as our state backend. 
> I've made a simple example to demonstrate the possible memory leak. In this 
> example I update state with 1mb value for each key and then sleep for 1 
> second. Memory growth 1mb per second until the process crashes, as if the 
> state value stays in memory. Same thing happens if I send 100 messages per 
> second with 10kb each. I've also tested `MapState`, it's the same. 
> Either there is a memory leak, or my setup with default RocksDB configuration 
> just doesn't fit the example. 
>  
> ```python 
> import time
> import psutil
> from pyflink.common import Types
> from pyflink.datastream import (
>     EmbeddedRocksDBStateBackend,
>     KeyedProcessFunction,
>     RuntimeContext,
>     StreamExecutionEnvironment,
> )
> from pyflink.datastream.state import ValueStateDescriptor
> class Processor(KeyedProcessFunction):
>     def open(self, runtime_context: RuntimeContext):
>         self.state = runtime_context.get_state(
>             ValueStateDescriptor(
>                 name="my_state",
>                 value_type_info=Types.STRING(),
>             )
>         )
>     def process_element(self, value: int, ctx: KeyedProcessFunction.Context):
>         print("Processing", value, "Memory: ", 
> round(psutil.Process().memory_info().rss / 1024 / 1024, 2), "MB")
>         # Processing 1 Memory:  171.25 MB -> Processing 2 Memory:  172.12 MB 
> -> ... grows 1mb per second, which should not happen because we use RocksDB 
> as state backend
>         self.state.update("a" * 1_000_000)  # 1 mb of data per second
>         time.sleep(1.0)
> if {_}{{_}}name{{_}}{_} == "{_}{{_}}main{{_}}{_}":
>     # - Create flink environment
>     environment = 
> StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)
>     # - Make sure to use RocksDB as state backend
>     environment.set_state_backend(EmbeddedRocksDBStateBackend())
>     # - Create pipeline
>     (
>         environment.from_collection(
>             collection=list(range(3600 * 12)),
>         )
>         .key_by(lambda value: value)
>         .process(Processor())
>     )
>     # - Execute pipeline
>     environment.execute(job_name="memory_leak_test")
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to