[ 
https://issues.apache.org/jira/browse/FLINK-17923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17118622#comment-17118622
 ] 

Stephan Ewen commented on FLINK-17923:
--------------------------------------

I think that before resolving this in a way that changes RocksDBs memory 
management, we need to first clarify what out future model for the Python 
Processes is.

I am somewhat skeptical that the current approach where the Python process 
takes managed memory by default is the right one.

For example, the Beam/Flink users really like to deploy the Python interpreter 
in separate containers. For streaming scenarios that is the most robust setup, 
I think. In that case, the Flink TaskManager cannot dedicate its managed memory 
to the Python process.

The current model seems also somewhat unpredictable when it comes to 
performance. If you have many slots and operators, each process's memory budget 
gets very small. It may eventually start swapping even.


> It will throw MemoryAllocationException if rocksdb statebackend and Python 
> UDF are used in the same slot  
> ----------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-17923
>                 URL: https://issues.apache.org/jira/browse/FLINK-17923
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python, Runtime / State Backends
>    Affects Versions: 1.10.0, 1.11.0
>            Reporter: Dian Fu
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.11.0
>
>
> For the following job:
> {code}
> import logging
> import os
> import shutil
> import sys
> import tempfile
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import TableConfig, StreamTableEnvironment, DataTypes
> from pyflink.table.udf import udf
> def word_count():
>     content = "line Licensed to the Apache Software Foundation ASF under one 
> " \
>               "line or more contributor license agreements See the NOTICE 
> file " \
>               "line distributed with this work for additional information " \
>               "line regarding copyright ownership The ASF licenses this file 
> " \
>               "to you under the Apache License Version the " \
>               "License you may not use this file except in compliance " \
>               "with the License"
>     t_config = TableConfig()
>     env = StreamExecutionEnvironment.get_execution_environment()
>     t_env = StreamTableEnvironment.create(env, t_config)
>     # register Results table in table environment
>     tmp_dir = tempfile.gettempdir()
>     result_path = tmp_dir + '/result'
>     if os.path.exists(result_path):
>         try:
>             if os.path.isfile(result_path):
>                 os.remove(result_path)
>             else:
>                 shutil.rmtree(result_path)
>         except OSError as e:
>             logging.error("Error removing directory: %s - %s.", e.filename, 
> e.strerror)
>     logging.info("Results directory: %s", result_path)
>     sink_ddl = """
>         create table Results(
>             word VARCHAR,
>             `count` BIGINT
>         ) with (
>             'connector' = 'blackhole'
>         )
>         """
>     t_env.sql_update(sink_ddl)
>     @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
>     def inc(count):
>         return count + 1
>     t_env.register_function("inc", inc)
>     elements = [(word, 1) for word in content.split(" ")]
>     t_env.from_elements(elements, ["word", "count"]) \
>          .group_by("word") \
>          .select("word, count(1) as count") \
>          .select("word, inc(count) as count") \
>          .insert_into("Results")
>     t_env.execute("word_count")
> if __name__ == '__main__':
>     logging.basicConfig(stream=sys.stdout, level=logging.INFO, 
> format="%(message)s")
>     word_count()
> {code}
> It will throw the following exception if rocksdb state backend is used:
> {code}
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for KeyedProcessOperator_c27dcf7b54ef6bfd6cff02ca8870b681_(1/1) 
> from any of the 1 provided restore options.
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144)
>       ... 9 more
> Caused by: java.io.IOException: Failed to acquire shared cache resource for 
> RocksDB
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:212)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:516)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301)
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>       ... 11 more
> Caused by: org.apache.flink.runtime.memory.MemoryAllocationException: Could 
> not created the shared memory resource of size 536870920. Not enough memory 
> left to reserve from the slot's managed memory.
>       at 
> org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$8(MemoryManager.java:603)
>       at 
> org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130)
>       at 
> org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
>       at 
> org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:617)
>       at 
> org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:566)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:208)
>       ... 15 more
> Caused by: org.apache.flink.runtime.memory.MemoryReservationException: Could 
> not allocate 536870920 bytes. Only 454033416 bytes are remaining.
>       at 
> org.apache.flink.runtime.memory.MemoryManager.reserveMemory(MemoryManager.java:461)
>       at 
> org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$8(MemoryManager.java:601)
>       ... 20 more
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to