This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.6 by this push:
     new 967b31b  [FLINK-10582][rest] Make REST executor's thread priority 
configurable
967b31b is described below

commit 967b31b333e6f4b014ea3041f420bfaff2484618
Author: Till Rohrmann <[email protected]>
AuthorDate: Wed Oct 17 16:44:05 2018 +0200

    [FLINK-10582][rest] Make REST executor's thread priority configurable
    
    Introduce RestOptions#SERVER_THREAD_PRIORITY("rest.server.thread-priority") 
to configure the
    thread priority of the REST executor's threads.
---
 docs/_includes/generated/rest_configuration.html            |  5 +++++
 .../java/org/apache/flink/configuration/RestOptions.java    |  9 +++++++++
 .../apache/flink/runtime/entrypoint/ClusterEntrypoint.java  |  1 +
 .../org/apache/flink/runtime/minicluster/MiniCluster.java   |  1 +
 .../apache/flink/runtime/webmonitor/WebMonitorEndpoint.java | 13 +++++++++++--
 5 files changed, 27 insertions(+), 2 deletions(-)

diff --git a/docs/_includes/generated/rest_configuration.html 
b/docs/_includes/generated/rest_configuration.html
index 1aa963f..2c5f539 100644
--- a/docs/_includes/generated/rest_configuration.html
+++ b/docs/_includes/generated/rest_configuration.html
@@ -62,5 +62,10 @@
             <td style="word-wrap: break-word;">4</td>
             <td>The number of threads for the asynchronous processing of 
requests.</td>
         </tr>
+        <tr>
+            <td><h5>rest.server.thread-priority</h5></td>
+            <td style="word-wrap: break-word;">5</td>
+            <td>Thread priority of the REST server's executor for processing 
asynchronous requests. Lowering the thread priority will give Flink's main 
components more CPU time whereas increasing will allocate more time for the 
REST server's processing.</td>
+        </tr>
     </tbody>
 </table>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
index edfd39b..11c38de 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
@@ -19,6 +19,7 @@
 package org.apache.flink.configuration;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.description.Description;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
 
@@ -121,4 +122,12 @@ public class RestOptions {
                key("rest.server.numThreads")
                        .defaultValue(4)
                        .withDescription("The number of threads for the 
asynchronous processing of requests.");
+
+       public static final ConfigOption<Integer> SERVER_THREAD_PRIORITY = 
key("rest.server.thread-priority")
+               .defaultValue(Thread.NORM_PRIORITY)
+               .withDescription(Description.builder()
+                       .text("Thread priority of the REST server's executor 
for processing asynchronous requests. " +
+                               "Lowering the thread priority will give Flink's 
main components more CPU time whereas " +
+                               "increasing will allocate more time for the 
REST server's processing.")
+                       .build());
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index fd0a0a1..4f575ce 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -331,6 +331,7 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
                                transientBlobCache,
                                WebMonitorEndpoint.createExecutorService(
                                        
configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
+                                       
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
                                        "DispatcherRestEndpoint"),
                                new AkkaQueryServiceRetriever(actorSystem, 
timeout),
                                
highAvailabilityServices.getWebMonitorLeaderElectionService());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 4bfdb25..1d2c8c7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -348,6 +348,7 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
                                        blobServer.getTransientBlobService(),
                                        
WebMonitorEndpoint.createExecutorService(
                                                
configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1),
+                                               
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
                                                "DispatcherRestEndpoint"),
                                        new AkkaQueryServiceRetriever(
                                                actorSystem,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 02d92dc..c480c33 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -783,11 +783,20 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
                return archivedJson;
        }
 
-       public static ExecutorService createExecutorService(int numThreads, 
String componentName) {
+       public static ExecutorService createExecutorService(int numThreads, int 
threadPriority, String componentName) {
+               if (threadPriority < Thread.MIN_PRIORITY || threadPriority > 
Thread.MAX_PRIORITY) {
+                       throw new IllegalArgumentException(
+                               String.format(
+                                       "The thread priority must be within 
(%s, %s) but it was %s.",
+                                       Thread.MIN_PRIORITY,
+                                       Thread.MAX_PRIORITY,
+                                       threadPriority));
+               }
+
                return Executors.newFixedThreadPool(
                        numThreads,
                        new ExecutorThreadFactory.Builder()
-                               .setThreadPriority(Thread.MIN_PRIORITY)
+                               .setThreadPriority(threadPriority)
                                .setPoolName("Flink-" + componentName)
                                .build());
        }

Reply via email to