[
https://issues.apache.org/jira/browse/FLINK-17923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17120728#comment-17120728
]
Xintong Song commented on FLINK-17923:
--------------------------------------
[~sewen],
I'm still trying to understand, what is the benefit of making Python processes
the responsibility of the deployment framework.
I'm not saying this is not the right approach. I'm asking because, I have
several concerns on this approach. These concerns are probably resolvable, in
one way or another. But first I would like to understand whether what we gain
worth the efforts.
*1. The resources reserved for Python UDFs (python process containers on K8s,
and the additional memory on Yarn) might be wasted in use cases without Python
UDFs.* I think one of the reasons we make RocksDB uses managed memory in
FLIP-49 is that, we want the default configuration works for all use cases
while not leaving part of the memory unused. As for Python, if we reserve
resources by default, these resources will be wasted in non-python use cases.
If we don't reserve resources by default, then the default configuration does
not work for python use cases.
*2. Which component is responsible for managing lifecycles for Python
Processes?* Do we consider the python processes as part of the Flink framework?
If so, the lifecycle of python processes should be decoupled from a job's
lifecycle. If the user code does something wrong that makes the python process
fail, Flink should be able to bring it back up. This could be achieved
naturally on Kubernetes, but not on Yarn. On Yarn, once a container is started
the YarnResourceManager can no longer start another process on it. We probably
need to start another service in YarnTaskExecutorRunner to
start/monitor/recover/stop the python processes. That sounds similarly to just
have the TaskManager managing the Python processes.
> It will throw MemoryAllocationException if rocksdb statebackend and Python
> UDF are used in the same slot
> ----------------------------------------------------------------------------------------------------------
>
> Key: FLINK-17923
> URL: https://issues.apache.org/jira/browse/FLINK-17923
> Project: Flink
> Issue Type: Bug
> Components: API / Python, Runtime / State Backends
> Affects Versions: 1.10.0, 1.11.0
> Reporter: Dian Fu
> Priority: Blocker
> Labels: pull-request-available
> Fix For: 1.11.0
>
>
> For the following job:
> {code}
> import logging
> import os
> import shutil
> import sys
> import tempfile
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import TableConfig, StreamTableEnvironment, DataTypes
> from pyflink.table.udf import udf
> def word_count():
> content = "line Licensed to the Apache Software Foundation ASF under one
> " \
> "line or more contributor license agreements See the NOTICE
> file " \
> "line distributed with this work for additional information " \
> "line regarding copyright ownership The ASF licenses this file
> " \
> "to you under the Apache License Version the " \
> "License you may not use this file except in compliance " \
> "with the License"
> t_config = TableConfig()
> env = StreamExecutionEnvironment.get_execution_environment()
> t_env = StreamTableEnvironment.create(env, t_config)
> # register Results table in table environment
> tmp_dir = tempfile.gettempdir()
> result_path = tmp_dir + '/result'
> if os.path.exists(result_path):
> try:
> if os.path.isfile(result_path):
> os.remove(result_path)
> else:
> shutil.rmtree(result_path)
> except OSError as e:
> logging.error("Error removing directory: %s - %s.", e.filename,
> e.strerror)
> logging.info("Results directory: %s", result_path)
> sink_ddl = """
> create table Results(
> word VARCHAR,
> `count` BIGINT
> ) with (
> 'connector' = 'blackhole'
> )
> """
> t_env.sql_update(sink_ddl)
> @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
> def inc(count):
> return count + 1
> t_env.register_function("inc", inc)
> elements = [(word, 1) for word in content.split(" ")]
> t_env.from_elements(elements, ["word", "count"]) \
> .group_by("word") \
> .select("word, count(1) as count") \
> .select("word, inc(count) as count") \
> .insert_into("Results")
> t_env.execute("word_count")
> if __name__ == '__main__':
> logging.basicConfig(stream=sys.stdout, level=logging.INFO,
> format="%(message)s")
> word_count()
> {code}
> It will throw the following exception if rocksdb state backend is used:
> {code}
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for KeyedProcessOperator_c27dcf7b54ef6bfd6cff02ca8870b681_(1/1)
> from any of the 1 provided restore options.
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144)
> ... 9 more
> Caused by: java.io.IOException: Failed to acquire shared cache resource for
> RocksDB
> at
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:212)
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:516)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> ... 11 more
> Caused by: org.apache.flink.runtime.memory.MemoryAllocationException: Could
> not created the shared memory resource of size 536870920. Not enough memory
> left to reserve from the slot's managed memory.
> at
> org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$8(MemoryManager.java:603)
> at
> org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130)
> at
> org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
> at
> org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:617)
> at
> org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:566)
> at
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:208)
> ... 15 more
> Caused by: org.apache.flink.runtime.memory.MemoryReservationException: Could
> not allocate 536870920 bytes. Only 454033416 bytes are remaining.
> at
> org.apache.flink.runtime.memory.MemoryManager.reserveMemory(MemoryManager.java:461)
> at
> org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$8(MemoryManager.java:601)
> ... 20 more
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)