This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f231ac  [FLINK-10582][rest] Make REST executor's thread priority 
configurable
3f231ac is described below

commit 3f231ac46537654aebdd73b0f42aa823386bf901
Author: Till Rohrmann <[email protected]>
AuthorDate: Wed Oct 17 16:45:38 2018 +0200

    [FLINK-10582][rest] Make REST executor's thread priority configurable
    
    Introduce RestOptions#SERVER_THREAD_PRIORITY("rest.server.thread-priority") 
to configure the
    thread priority of the REST executor's threads.
---
 docs/_includes/generated/rest_configuration.html            |  5 +++++
 .../java/org/apache/flink/configuration/RestOptions.java    |  9 +++++++++
 .../AbstractDispatcherResourceManagerComponentFactory.java  |  1 +
 .../org/apache/flink/runtime/minicluster/MiniCluster.java   |  1 +
 .../apache/flink/runtime/webmonitor/WebMonitorEndpoint.java | 13 +++++++++++--
 5 files changed, 27 insertions(+), 2 deletions(-)

diff --git a/docs/_includes/generated/rest_configuration.html 
b/docs/_includes/generated/rest_configuration.html
index 1aa963f..2c5f539 100644
--- a/docs/_includes/generated/rest_configuration.html
+++ b/docs/_includes/generated/rest_configuration.html
@@ -62,5 +62,10 @@
             <td style="word-wrap: break-word;">4</td>
             <td>The number of threads for the asynchronous processing of 
requests.</td>
         </tr>
+        <tr>
+            <td><h5>rest.server.thread-priority</h5></td>
+            <td style="word-wrap: break-word;">5</td>
+            <td>Thread priority of the REST server's executor for processing 
asynchronous requests. Lowering the thread priority will give Flink's main 
components more CPU time whereas increasing will allocate more time for the 
REST server's processing.</td>
+        </tr>
     </tbody>
 </table>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
index edfd39b..11c38de 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
@@ -19,6 +19,7 @@
 package org.apache.flink.configuration;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.description.Description;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
 
@@ -121,4 +122,12 @@ public class RestOptions {
                key("rest.server.numThreads")
                        .defaultValue(4)
                        .withDescription("The number of threads for the 
asynchronous processing of requests.");
+
+       public static final ConfigOption<Integer> SERVER_THREAD_PRIORITY = 
key("rest.server.thread-priority")
+               .defaultValue(Thread.NORM_PRIORITY)
+               .withDescription(Description.builder()
+                       .text("Thread priority of the REST server's executor 
for processing asynchronous requests. " +
+                               "Lowering the thread priority will give Flink's 
main components more CPU time whereas " +
+                               "increasing will allocate more time for the 
REST server's processing.")
+                       .build());
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
index c09c41b..6d557d0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
@@ -141,6 +141,7 @@ public abstract class 
AbstractDispatcherResourceManagerComponentFactory<T extend
                                blobServer,
                                WebMonitorEndpoint.createExecutorService(
                                        
configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
+                                       
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
                                        "DispatcherRestEndpoint"),
                                new AkkaQueryServiceRetriever(actorSystem, 
timeout),
                                
highAvailabilityServices.getWebMonitorLeaderElectionService(),
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 3fc7007..69bad47 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -356,6 +356,7 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
                                        blobServer.getTransientBlobService(),
                                        
WebMonitorEndpoint.createExecutorService(
                                                
configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1),
+                                               
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
                                                "DispatcherRestEndpoint"),
                                        new AkkaQueryServiceRetriever(
                                                metricQueryServiceActorSystem,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 02d92dc..c480c33 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -783,11 +783,20 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
                return archivedJson;
        }
 
-       public static ExecutorService createExecutorService(int numThreads, 
String componentName) {
+       public static ExecutorService createExecutorService(int numThreads, int 
threadPriority, String componentName) {
+               if (threadPriority < Thread.MIN_PRIORITY || threadPriority > 
Thread.MAX_PRIORITY) {
+                       throw new IllegalArgumentException(
+                               String.format(
+                                       "The thread priority must be within 
(%s, %s) but it was %s.",
+                                       Thread.MIN_PRIORITY,
+                                       Thread.MAX_PRIORITY,
+                                       threadPriority));
+               }
+
                return Executors.newFixedThreadPool(
                        numThreads,
                        new ExecutorThreadFactory.Builder()
-                               .setThreadPriority(Thread.MIN_PRIORITY)
+                               .setThreadPriority(threadPriority)
                                .setPoolName("Flink-" + componentName)
                                .build());
        }

Reply via email to