[ 
https://issues.apache.org/jira/browse/FLINK-34631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mark Lidenberg updated FLINK-34631:
-----------------------------------
    Description: 
I have had issues with memory constantly growing on pyflink task managers, 
which should not really happen when we use RocksDB as our state backend. 

I've made a simple example to demonstrate the memory leak. In this example I 
update state with 1mb value for each key and then sleep for 1 second. Memory 
growth 1mb per second until the process crashes, as if the state value stays in 
memory. Same thing happens if I send 100 messages per second with 10kb each. 
Memory keeps growing indefinitely. I've also tested `MapState`, it's the same. 

 

```python 
import time

import psutil

from pyflink.common import Types
from pyflink.datastream import (
    EmbeddedRocksDBStateBackend,
    KeyedProcessFunction,
    RuntimeContext,
    StreamExecutionEnvironment,
)
from pyflink.datastream.state import ValueStateDescriptor

class Processor(KeyedProcessFunction):
    def open(self, runtime_context: RuntimeContext):
        self.state = runtime_context.get_state(
            ValueStateDescriptor(
                name="my_state",
                value_type_info=Types.STRING(),
            )
        )

    def process_element(self, value: int, ctx: KeyedProcessFunction.Context):
        print("Processing", value, "Memory: ", 
round(psutil.Process().memory_info().rss / 1024 / 1024, 2), "MB")

        # Processing 1 Memory:  171.25 MB -> Processing 2 Memory:  172.12 MB -> 
... grows 1mb per second, which should not happen because we use RocksDB as 
state backend
        self.state.update("a" * 1_000_000)  # 1 mb of data per second
        time.sleep(1.0)

if _{_}name{_}_ == "_{_}main{_}_":
    # - Create flink environment

    environment = 
StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)

    # - Make sure to use RocksDB as state backend

    environment.set_state_backend(EmbeddedRocksDBStateBackend())

    # - Create pipeline

    (
        environment.from_collection(
            collection=list(range(3600 * 12)),
        )
        .key_by(lambda value: value)
        .process(Processor())
    )

    # - Execute pipeline

    environment.execute(job_name="memory_leak_test")

```

  was:
I have had issues with memory constantly growing on pyflink task managers, 
which should not really happen when we use RocksDB as our state backend. 

I've made a simple example to demonstrate the memory leak. In this example I 
update state with 1mb value for each key and then sleep for 1 second. Memory 
growth 1mb per second, as if the state value stays in memory. Same thing 
happens if I send 100 messages per second with 10kb each. Memory keeps growing 
indefinitely. I've also tested `MapState`, it's the same. 

 

```python 
import time

import psutil

from pyflink.common import Types
from pyflink.datastream import (
    EmbeddedRocksDBStateBackend,
    KeyedProcessFunction,
    RuntimeContext,
    StreamExecutionEnvironment,
)
from pyflink.datastream.state import ValueStateDescriptor


class Processor(KeyedProcessFunction):
    def open(self, runtime_context: RuntimeContext):
        self.state = runtime_context.get_state(
            ValueStateDescriptor(
                name="my_state",
                value_type_info=Types.STRING(),
            )
        )

    def process_element(self, value: int, ctx: KeyedProcessFunction.Context):
        print("Processing", value, "Memory: ", 
round(psutil.Process().memory_info().rss / 1024 / 1024, 2), "MB")

        # Processing 1 Memory:  171.25 MB -> Processing 2 Memory:  172.12 MB -> 
... grows 1mb per second, which should not happen because we use RocksDB as 
state backend
        self.state.update("a" * 1_000_000)  # 1 mb of data per second
        time.sleep(1.0)


if __name__ == "__main__":
    # - Create flink environment

    environment = 
StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)

    # - Make sure to use RocksDB as state backend

    environment.set_state_backend(EmbeddedRocksDBStateBackend())

    # - Create pipeline

    (
        environment.from_collection(
            collection=list(range(3600 * 12)),
        )
        .key_by(lambda value: value)
        .process(Processor())
    )

    # - Execute pipeline

    environment.execute(job_name="memory_leak_test")

```


> Memory leak in pyflink when using state with RocksDB 
> -----------------------------------------------------
>
>                 Key: FLINK-34631
>                 URL: https://issues.apache.org/jira/browse/FLINK-34631
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>    Affects Versions: 1.18.1
>            Reporter: Mark Lidenberg
>            Priority: Major
>
> I have had issues with memory constantly growing on pyflink task managers, 
> which should not really happen when we use RocksDB as our state backend. 
> I've made a simple example to demonstrate the memory leak. In this example I 
> update state with 1mb value for each key and then sleep for 1 second. Memory 
> growth 1mb per second until the process crashes, as if the state value stays 
> in memory. Same thing happens if I send 100 messages per second with 10kb 
> each. Memory keeps growing indefinitely. I've also tested `MapState`, it's 
> the same. 
>  
> ```python 
> import time
> import psutil
> from pyflink.common import Types
> from pyflink.datastream import (
>     EmbeddedRocksDBStateBackend,
>     KeyedProcessFunction,
>     RuntimeContext,
>     StreamExecutionEnvironment,
> )
> from pyflink.datastream.state import ValueStateDescriptor
> class Processor(KeyedProcessFunction):
>     def open(self, runtime_context: RuntimeContext):
>         self.state = runtime_context.get_state(
>             ValueStateDescriptor(
>                 name="my_state",
>                 value_type_info=Types.STRING(),
>             )
>         )
>     def process_element(self, value: int, ctx: KeyedProcessFunction.Context):
>         print("Processing", value, "Memory: ", 
> round(psutil.Process().memory_info().rss / 1024 / 1024, 2), "MB")
>         # Processing 1 Memory:  171.25 MB -> Processing 2 Memory:  172.12 MB 
> -> ... grows 1mb per second, which should not happen because we use RocksDB 
> as state backend
>         self.state.update("a" * 1_000_000)  # 1 mb of data per second
>         time.sleep(1.0)
> if _{_}name{_}_ == "_{_}main{_}_":
>     # - Create flink environment
>     environment = 
> StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)
>     # - Make sure to use RocksDB as state backend
>     environment.set_state_backend(EmbeddedRocksDBStateBackend())
>     # - Create pipeline
>     (
>         environment.from_collection(
>             collection=list(range(3600 * 12)),
>         )
>         .key_by(lambda value: value)
>         .process(Processor())
>     )
>     # - Execute pipeline
>     environment.execute(job_name="memory_leak_test")
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to