[ 
https://issues.apache.org/jira/browse/FLINK-23654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicolas Raga updated FLINK-23654:
---------------------------------
    Description: 
The JobManagerSharedServices futureExecutor is used for asynchronous request in 
multiple Flink components. When the JobMaster creates the execution graph, it 
passes the *scheduledExecutorService* (which is the 
jobManagerSharedServices.getScheduledExecutorService) to both the 
*futureExecutor* and the *ioExecutor.* In the ExecutionGraph, the *ioExecutor* 
is the executor which is used to execute blocking I/O operations. It is also 
passed in to the *CheckpointCoordinator* which uses it for asynchronous calls 
like disposing pending checkpoints, clean up failed checkpoints, etc. The 
*futureExecutor*  is even passed on to the *Execution* class, which is then 
used to dispatch callbacks from futures and asynchronous RPC calls from within 
vertexes! Lastly this executor is also used to process asynchronous requests 
from the Flink REST endpoint.

 

Hence, using the endpoint for monitoring during large checkpoints or blocking 
I/O operations on the same threadpool causes degraded performance on the 
endpoint. We have already been able to test that an increase in this thread 
count allows to faster responses to incoming requests. We can begin by simply 
exposing a *jobmanager.future-thread.factor* that can provide a factor above 
the number of CPU's. Afterwards, we can consider a dedicated thread pool for 
blocking I/O that won't cause degradation of performance for the REST endpoint.

  was:
The JobManagerSharedServices futureExecutor is used for asynchronous request in 
multiple Flink components. When the JobMaster creates the execution graph, it 
passes the *scheduledExecutorService* (which is the 
jobManagerSharedServices.getScheduledExecutorService) to both the 
*futureExecutor* and the *ioExecutor.* In the ExecutionGraph, the *ioExecutor* 
is the executor which is used to execute blocking I/O operations. It is also 
passed in to the *CheckpointCoordinator* which uses it for asynchronous calls 
like disposing pending checkpoints, clean up failed checkpoints, etc. The 
*futureExecutor*  is even passed on to the *Execution* class, which is then 
used to dispatch callbacks from futures and asynchronous RPC calls from within 
vertexes!

Lastly this executor is also used to process asynchronous requests from the 
Flink REST endpoint. Hence, using the endpoint for monitoring during emission 
of large checkpoints or blocking operations on this threadpool cause degraded 
performance. We have already been able to test that an increase in this thread 
count allows to faster responses to incoming requests. 


> Allow configuration for number of jobmanager-future threads
> -----------------------------------------------------------
>
>                 Key: FLINK-23654
>                 URL: https://issues.apache.org/jira/browse/FLINK-23654
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / REST
>            Reporter: Nicolas Raga
>            Priority: Critical
>
> The JobManagerSharedServices futureExecutor is used for asynchronous request 
> in multiple Flink components. When the JobMaster creates the execution graph, 
> it passes the *scheduledExecutorService* (which is the 
> jobManagerSharedServices.getScheduledExecutorService) to both the 
> *futureExecutor* and the *ioExecutor.* In the ExecutionGraph, the 
> *ioExecutor* is the executor which is used to execute blocking I/O 
> operations. It is also passed in to the *CheckpointCoordinator* which uses it 
> for asynchronous calls like disposing pending checkpoints, clean up failed 
> checkpoints, etc. The *futureExecutor*  is even passed on to the *Execution* 
> class, which is then used to dispatch callbacks from futures and asynchronous 
> RPC calls from within vertexes! Lastly this executor is also used to process 
> asynchronous requests from the Flink REST endpoint.
>  
> Hence, using the endpoint for monitoring during large checkpoints or blocking 
> I/O operations on the same threadpool causes degraded performance on the 
> endpoint. We have already been able to test that an increase in this thread 
> count allows to faster responses to incoming requests. We can begin by simply 
> exposing a *jobmanager.future-thread.factor* that can provide a factor above 
> the number of CPU's. Afterwards, we can consider a dedicated thread pool for 
> blocking I/O that won't cause degradation of performance for the REST 
> endpoint.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to