[ 
https://issues.apache.org/jira/browse/FLINK-18738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17176128#comment-17176128
 ] 

Xintong Song commented on FLINK-18738:
--------------------------------------

Thanks for the feedbacks, [~trohrmann] & [~sewen].

 

To [~trohrmann],
{quote}I think it would be nice to solve this problem not only for Python but 
also for other languages we might want to support in the future (if possible 
and if it does not broaden the scope too massively).
{quote}
I like the idea to not only solve this problem for Python, but provide a 
general mechanism for all non-java languages we might support in future.
{quote}The first question which comes to my mind is whether the 
{{TaskExecutor}} should be responsible for managing the component running the 
Python (or any other language) code or not. If there is no good reason for the 
{{TaskExecutor}} to manage this component, it could make the {{TaskExecutor}} 
simpler by not aggregating too many responsibilities. If the language execution 
service benefitted from a tight integration in the execution of a job (e.g. by 
allocating and releasing resources swiftly) on the other hand, then a tighter 
integration could make sense.
{quote}
I think the biggest benefit for integrating the external language execution 
service into task managers is that, we can reuse the existing task scheduling 
and deployment mechanism. To be specific, to manage the external language 
execution service completely independent from task managers, we probably need 
to deal with the following issues.
 * Lifecycle management: Active resource managers or users need to request 
resource for external language executors, launch the executors. Resource 
managers needs to accept executor registrations, establish connection and 
heartbeats.
 * Scheduling: Do external language executors follow the slot based scheduling? 
If not, how do we decide how many tasks should be scheduled onto each executor? 
And which executor a task should be scheduled onto?
 * Deploying: Who's responsible for deploying tasks into external language 
executors, job masters or Java tasks in task managers? The former requires job 
master to be aware of different kind of executors, and establish connections 
with each of them. For the later, how do the Java tasks know which executor to 
deploy the workload?
 * Does the external language executors need to support data shuffling?
 * How do they access the state backends? Does the external executors need to 
be colocated with the task managers?

For most of the above mentioned issues, we already have solutions in task 
managers. It would be much simpler if we can reuse them directly. Integrating 
the external language execution service with task managers might complicate the 
task manager a bit, but should make it simpler for other components.

We could further scope the complexity out from TaskExecutor with a plugin-based 
approach, similar to state backends. Will describe later.
{quote}Whether a supported language supports code isolation or not should not 
necessarily decide which process model to choose.
{quote}
Makes sense to me.
{quote}Concerning the proposed memory management: Does option 2. entail that 
one enables the cluster to run with Python at deployment time of the 
{{TaskExecutor}}? Differently asked will we reserve a fixed amount of memory 
for the Python process independent of whether we actually run a Python workload 
or not?
{quote}
True. We could make Python memory by default 0, but option 2) means as long as 
it is configured non-zero, the memory will be reserved with or without the 
python workload.

 

To [~sewen],
{quote}Generally, going with one Python process per slot seems fair, but that 
means the Python process executes multiple operators.
How well does that work? Is there once gRPC connection that handles all 
operators, or are there multiple streams between the processes (one per 
operator)? Are there any issues with GIL or so that impact performance?
{quote}
I'm not absolutely sure about this. Maybe [~dian.fu] could share some inputs.
{quote}The managed memory integration on the other hand could be quite simple. 
The Python process could be a simple external resource claiming budget from the 
Memory Manager. The per-slot bookkeeping, the singleton initialization, ref 
counting, thread-safe shutdown, all of that is already there from the RocksDB 
integration. It should be straightforward to just reduce this.

The advantage of this is that there is
 - simple, no extended memory model
 - no wasted memory in session clusters that are used in a mixed way (this is 
probably not too important to optimize for, though).{quote}
Thanks Stephan. I think this is a really good suggestion to reuse the shared 
resource mechanism in memory manager. I think no wasted memory is an important 
advantage, not only for session clusters, but also for scenarios with multiple 
slot sharing groups, and batch workloads.

The shortcoming is, as previously mentioned, coupling between RocksDB and 
Python memory. I think we should be able to work it out, and the benefit sounds 
worth the effort. 

 

*External Language Execution Framework*

The idea of reusing the memory sharing mechanism from RocksDB inspires me about 
how the external language execution service is similar with RocksDB state 
backend. Basically, they both claim memory budget from memory manager, and 
provide services to operators.

If we could define some sort of `ExternalExecutionBackend` interface, then most 
of the details and complexity can be hidden in the implementation fo the 
interface. Task manager does not need to know how the backend uses the memory, 
whether it calls a library, launches a process or connects to an external 
service, etc.. Whether the backends shares the same Python process or each uses 
a dedicated process is also hidden inside the backend implementation.

We could easily extend Flink with more language supports by providing new 
implementations of the backend interfaces. Ideally, the process model and 
memory management of Flink framework would need no changes. 

> Revisit resource management model for python processes.
> -------------------------------------------------------
>
>                 Key: FLINK-18738
>                 URL: https://issues.apache.org/jira/browse/FLINK-18738
>             Project: Flink
>          Issue Type: Task
>          Components: API / Python, Runtime / Coordination
>            Reporter: Xintong Song
>            Assignee: Xintong Song
>            Priority: Major
>             Fix For: 1.12.0
>
>
> This ticket is for tracking the effort towards a proper long-term resource 
> management model for python processes.
> In FLINK-17923, we run into problems due to python processes are not well 
> integrate with the task manager resource management mechanism. A temporal 
> workaround has been merged for release-1.11.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to