This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 3f231ac [FLINK-10582][rest] Make REST executor's thread priority
configurable
3f231ac is described below
commit 3f231ac46537654aebdd73b0f42aa823386bf901
Author: Till Rohrmann <[email protected]>
AuthorDate: Wed Oct 17 16:45:38 2018 +0200
[FLINK-10582][rest] Make REST executor's thread priority configurable
Introduce RestOptions#SERVER_THREAD_PRIORITY("rest.server.thread-priority")
to configure the
thread priority of the REST executor's threads.
---
docs/_includes/generated/rest_configuration.html | 5 +++++
.../java/org/apache/flink/configuration/RestOptions.java | 9 +++++++++
.../AbstractDispatcherResourceManagerComponentFactory.java | 1 +
.../org/apache/flink/runtime/minicluster/MiniCluster.java | 1 +
.../apache/flink/runtime/webmonitor/WebMonitorEndpoint.java | 13 +++++++++++--
5 files changed, 27 insertions(+), 2 deletions(-)
diff --git a/docs/_includes/generated/rest_configuration.html
b/docs/_includes/generated/rest_configuration.html
index 1aa963f..2c5f539 100644
--- a/docs/_includes/generated/rest_configuration.html
+++ b/docs/_includes/generated/rest_configuration.html
@@ -62,5 +62,10 @@
<td style="word-wrap: break-word;">4</td>
<td>The number of threads for the asynchronous processing of
requests.</td>
</tr>
+ <tr>
+ <td><h5>rest.server.thread-priority</h5></td>
+ <td style="word-wrap: break-word;">5</td>
+ <td>Thread priority of the REST server's executor for processing
asynchronous requests. Lowering the thread priority will give Flink's main
components more CPU time whereas increasing will allocate more time for the
REST server's processing.</td>
+ </tr>
</tbody>
</table>
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
index edfd39b..11c38de 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
@@ -19,6 +19,7 @@
package org.apache.flink.configuration;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.description.Description;
import static org.apache.flink.configuration.ConfigOptions.key;
@@ -121,4 +122,12 @@ public class RestOptions {
key("rest.server.numThreads")
.defaultValue(4)
.withDescription("The number of threads for the
asynchronous processing of requests.");
+
+ public static final ConfigOption<Integer> SERVER_THREAD_PRIORITY =
key("rest.server.thread-priority")
+ .defaultValue(Thread.NORM_PRIORITY)
+ .withDescription(Description.builder()
+ .text("Thread priority of the REST server's executor
for processing asynchronous requests. " +
+ "Lowering the thread priority will give Flink's
main components more CPU time whereas " +
+ "increasing will allocate more time for the
REST server's processing.")
+ .build());
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
index c09c41b..6d557d0 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
@@ -141,6 +141,7 @@ public abstract class
AbstractDispatcherResourceManagerComponentFactory<T extend
blobServer,
WebMonitorEndpoint.createExecutorService(
configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
+
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
"DispatcherRestEndpoint"),
new AkkaQueryServiceRetriever(actorSystem,
timeout),
highAvailabilityServices.getWebMonitorLeaderElectionService(),
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 3fc7007..69bad47 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -356,6 +356,7 @@ public class MiniCluster implements JobExecutorService,
AutoCloseableAsync {
blobServer.getTransientBlobService(),
WebMonitorEndpoint.createExecutorService(
configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1),
+
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
"DispatcherRestEndpoint"),
new AkkaQueryServiceRetriever(
metricQueryServiceActorSystem,
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 02d92dc..c480c33 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -783,11 +783,20 @@ public class WebMonitorEndpoint<T extends RestfulGateway>
extends RestServerEndp
return archivedJson;
}
- public static ExecutorService createExecutorService(int numThreads,
String componentName) {
+ public static ExecutorService createExecutorService(int numThreads, int
threadPriority, String componentName) {
+ if (threadPriority < Thread.MIN_PRIORITY || threadPriority >
Thread.MAX_PRIORITY) {
+ throw new IllegalArgumentException(
+ String.format(
+ "The thread priority must be within
(%s, %s) but it was %s.",
+ Thread.MIN_PRIORITY,
+ Thread.MAX_PRIORITY,
+ threadPriority));
+ }
+
return Executors.newFixedThreadPool(
numThreads,
new ExecutorThreadFactory.Builder()
- .setThreadPriority(Thread.MIN_PRIORITY)
+ .setThreadPriority(threadPriority)
.setPoolName("Flink-" + componentName)
.build());
}