[
https://issues.apache.org/jira/browse/FLINK-17923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17118489#comment-17118489
]
Xintong Song edited comment on FLINK-17923 at 5/28/20, 9:29 AM:
----------------------------------------------------------------
Thanks [~dian.fu], that sounds good to me.
was (Author: xintongsong):
[~dian.fu], that sounds good to me.
> It will throw MemoryAllocationException if rocksdb statebackend and Python
> UDF are used in the same slot
> ----------------------------------------------------------------------------------------------------------
>
> Key: FLINK-17923
> URL: https://issues.apache.org/jira/browse/FLINK-17923
> Project: Flink
> Issue Type: Bug
> Components: API / Python, Runtime / State Backends
> Affects Versions: 1.10.0, 1.11.0
> Reporter: Dian Fu
> Priority: Blocker
> Labels: pull-request-available
> Fix For: 1.11.0
>
>
> For the following job:
> {code}
> import logging
> import os
> import shutil
> import sys
> import tempfile
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import TableConfig, StreamTableEnvironment, DataTypes
> from pyflink.table.udf import udf
> def word_count():
> content = "line Licensed to the Apache Software Foundation ASF under one
> " \
> "line or more contributor license agreements See the NOTICE
> file " \
> "line distributed with this work for additional information " \
> "line regarding copyright ownership The ASF licenses this file
> " \
> "to you under the Apache License Version the " \
> "License you may not use this file except in compliance " \
> "with the License"
> t_config = TableConfig()
> env = StreamExecutionEnvironment.get_execution_environment()
> t_env = StreamTableEnvironment.create(env, t_config)
> # register Results table in table environment
> tmp_dir = tempfile.gettempdir()
> result_path = tmp_dir + '/result'
> if os.path.exists(result_path):
> try:
> if os.path.isfile(result_path):
> os.remove(result_path)
> else:
> shutil.rmtree(result_path)
> except OSError as e:
> logging.error("Error removing directory: %s - %s.", e.filename,
> e.strerror)
> logging.info("Results directory: %s", result_path)
> sink_ddl = """
> create table Results(
> word VARCHAR,
> `count` BIGINT
> ) with (
> 'connector' = 'blackhole'
> )
> """
> t_env.sql_update(sink_ddl)
> @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
> def inc(count):
> return count + 1
> t_env.register_function("inc", inc)
> elements = [(word, 1) for word in content.split(" ")]
> t_env.from_elements(elements, ["word", "count"]) \
> .group_by("word") \
> .select("word, count(1) as count") \
> .select("word, inc(count) as count") \
> .insert_into("Results")
> t_env.execute("word_count")
> if __name__ == '__main__':
> logging.basicConfig(stream=sys.stdout, level=logging.INFO,
> format="%(message)s")
> word_count()
> {code}
> It will throw the following exception if rocksdb state backend is used:
> {code}
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for KeyedProcessOperator_c27dcf7b54ef6bfd6cff02ca8870b681_(1/1)
> from any of the 1 provided restore options.
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144)
> ... 9 more
> Caused by: java.io.IOException: Failed to acquire shared cache resource for
> RocksDB
> at
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:212)
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:516)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> ... 11 more
> Caused by: org.apache.flink.runtime.memory.MemoryAllocationException: Could
> not created the shared memory resource of size 536870920. Not enough memory
> left to reserve from the slot's managed memory.
> at
> org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$8(MemoryManager.java:603)
> at
> org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130)
> at
> org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
> at
> org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:617)
> at
> org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:566)
> at
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:208)
> ... 15 more
> Caused by: org.apache.flink.runtime.memory.MemoryReservationException: Could
> not allocate 536870920 bytes. Only 454033416 bytes are remaining.
> at
> org.apache.flink.runtime.memory.MemoryManager.reserveMemory(MemoryManager.java:461)
> at
> org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$8(MemoryManager.java:601)
> ... 20 more
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)