[
https://issues.apache.org/jira/browse/FLINK-17923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17116500#comment-17116500
]
Xintong Song commented on FLINK-17923:
--------------------------------------
The problem is that RocksDB assumes it is the only managed memory consumer in
streaming scenarios, and tries to take all the managed memory in the slot,
while actually Python UDF might also reserve managed memory in streaming
scenarios.
h3. More backgrounds
With FLIP-53, when generating the stream graph, we make a plan on how managed
memory should be shared by operators within a slot.
* For batch jobs, we calculate a fraction for each operator, representing what
fraction of the slot's managed memory the operator should use. Operators will
read this fraction in runtime, and reserve the corresponding memory from the
memory manager.
* For streaming jobs, we assumed the only managed memory consumer is
RocksDBStateBackend. Therefore, calculation of the fraction (when generating
stream graph) is omitted. RocksDBStateBackend will always reserve all (fraction
= 1) the managed memory from the memory manager.
h3. Potential solutions
There was an offline discussion between [~dian.fu], [~zhuzh], [~yunta] and me.
And here are some ideas we came up with.
# We can say that ATM we do not support Python UDF and RocksDBStateBackend work
together. We can add a check at compiling time and throw an exception / warning
if they are used together. Given that release 1.11 is already frozen, this
could avoid rushing significant changes in the last minute. The drawback is
obviously we loose a large portion of steaming Python UDF use cases for release
1.11.
# We can make Python UDF not reserve managed memory. Basically, Python UDF uses
memory in the same way how other user codes use off-heap memory. Users need to
explicitly configure larger task off-heap memory. The drawbacks for this
solutions are 1) it requires more user involvement, 2) it breaks the current
1.10 behavior in batch scenarios where such user involvement was not needed,
and 3) we might need to revert the changes in future.
# We can also calculate a fraction for RocksDBStateBackend, making it properly
share managed memory with Python UDFs. This is probably the most proper
solution. The problem is there are still some open questions, such as how to
calculate the fraction (because RocksDBStateBackend allocates managed memory in
a per-slot way rather than per-operator), and how to pass the fraction to the
state backend. We are not sure whether this is doable in the 1.11 release
cycle, given that it's already frozen.
We would like to hear more opinions from the community. Many Thanks.
> It will throw MemoryAllocationException if rocksdb statebackend and Python
> UDF are used in the same slot
> ----------------------------------------------------------------------------------------------------------
>
> Key: FLINK-17923
> URL: https://issues.apache.org/jira/browse/FLINK-17923
> Project: Flink
> Issue Type: Bug
> Components: API / Python, Runtime / State Backends
> Affects Versions: 1.10.0, 1.11.0
> Reporter: Dian Fu
> Priority: Blocker
> Fix For: 1.11.0
>
>
> For the following job:
> {code}
> import logging
> import os
> import shutil
> import sys
> import tempfile
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import TableConfig, StreamTableEnvironment, DataTypes
> from pyflink.table.udf import udf
> def word_count():
> content = "line Licensed to the Apache Software Foundation ASF under one
> " \
> "line or more contributor license agreements See the NOTICE
> file " \
> "line distributed with this work for additional information " \
> "line regarding copyright ownership The ASF licenses this file
> " \
> "to you under the Apache License Version the " \
> "License you may not use this file except in compliance " \
> "with the License"
> t_config = TableConfig()
> env = StreamExecutionEnvironment.get_execution_environment()
> t_env = StreamTableEnvironment.create(env, t_config)
> # register Results table in table environment
> tmp_dir = tempfile.gettempdir()
> result_path = tmp_dir + '/result'
> if os.path.exists(result_path):
> try:
> if os.path.isfile(result_path):
> os.remove(result_path)
> else:
> shutil.rmtree(result_path)
> except OSError as e:
> logging.error("Error removing directory: %s - %s.", e.filename,
> e.strerror)
> logging.info("Results directory: %s", result_path)
> sink_ddl = """
> create table Results(
> word VARCHAR,
> `count` BIGINT
> ) with (
> 'connector' = 'blackhole'
> )
> """
> t_env.sql_update(sink_ddl)
> @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
> def inc(count):
> return count + 1
> t_env.register_function("inc", inc)
> elements = [(word, 1) for word in content.split(" ")]
> t_env.from_elements(elements, ["word", "count"]) \
> .group_by("word") \
> .select("word, count(1) as count") \
> .select("word, inc(count) as count") \
> .insert_into("Results")
> t_env.execute("word_count")
> if __name__ == '__main__':
> logging.basicConfig(stream=sys.stdout, level=logging.INFO,
> format="%(message)s")
> word_count()
> {code}
> It will throw the following exception if rocksdb state backend is used:
> {code}
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for KeyedProcessOperator_c27dcf7b54ef6bfd6cff02ca8870b681_(1/1)
> from any of the 1 provided restore options.
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144)
> ... 9 more
> Caused by: java.io.IOException: Failed to acquire shared cache resource for
> RocksDB
> at
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:212)
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:516)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> ... 11 more
> Caused by: org.apache.flink.runtime.memory.MemoryAllocationException: Could
> not created the shared memory resource of size 536870920. Not enough memory
> left to reserve from the slot's managed memory.
> at
> org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$8(MemoryManager.java:603)
> at
> org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130)
> at
> org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
> at
> org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:617)
> at
> org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:566)
> at
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:208)
> ... 15 more
> Caused by: org.apache.flink.runtime.memory.MemoryReservationException: Could
> not allocate 536870920 bytes. Only 454033416 bytes are remaining.
> at
> org.apache.flink.runtime.memory.MemoryManager.reserveMemory(MemoryManager.java:461)
> at
> org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$8(MemoryManager.java:601)
> ... 20 more
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)