This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.5 by this push:
new 98f3828 [FLINK-10582][rest] Make REST executor's thread priority
configurable
98f3828 is described below
commit 98f38288a12065dd665f7e0b2420d57f6408121a
Author: Till Rohrmann <[email protected]>
AuthorDate: Wed Oct 17 16:43:11 2018 +0200
[FLINK-10582][rest] Make REST executor's thread priority configurable
---
docs/_includes/generated/rest_configuration.html | 5 +++++
.../java/org/apache/flink/configuration/RestOptions.java | 9 +++++++++
.../apache/flink/runtime/entrypoint/ClusterEntrypoint.java | 1 +
.../org/apache/flink/runtime/minicluster/MiniCluster.java | 1 +
.../apache/flink/runtime/webmonitor/WebMonitorEndpoint.java | 13 +++++++++++--
5 files changed, 27 insertions(+), 2 deletions(-)
diff --git a/docs/_includes/generated/rest_configuration.html
b/docs/_includes/generated/rest_configuration.html
index 1aa963f..2c5f539 100644
--- a/docs/_includes/generated/rest_configuration.html
+++ b/docs/_includes/generated/rest_configuration.html
@@ -62,5 +62,10 @@
<td style="word-wrap: break-word;">4</td>
<td>The number of threads for the asynchronous processing of
requests.</td>
</tr>
+ <tr>
+ <td><h5>rest.server.thread-priority</h5></td>
+ <td style="word-wrap: break-word;">5</td>
+ <td>Thread priority of the REST server's executor for processing
asynchronous requests. Lowering the thread priority will give Flink's main
components more CPU time whereas increasing will allocate more time for the
REST server's processing.</td>
+ </tr>
</tbody>
</table>
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
index edfd39b..11c38de 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
@@ -19,6 +19,7 @@
package org.apache.flink.configuration;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.description.Description;
import static org.apache.flink.configuration.ConfigOptions.key;
@@ -121,4 +122,12 @@ public class RestOptions {
key("rest.server.numThreads")
.defaultValue(4)
.withDescription("The number of threads for the
asynchronous processing of requests.");
+
+ public static final ConfigOption<Integer> SERVER_THREAD_PRIORITY =
key("rest.server.thread-priority")
+ .defaultValue(Thread.NORM_PRIORITY)
+ .withDescription(Description.builder()
+ .text("Thread priority of the REST server's executor
for processing asynchronous requests. " +
+ "Lowering the thread priority will give Flink's
main components more CPU time whereas " +
+ "increasing will allocate more time for the
REST server's processing.")
+ .build());
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 2c34b53..c4c5838 100755
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -330,6 +330,7 @@ public abstract class ClusterEntrypoint implements
FatalErrorHandler {
transientBlobCache,
WebMonitorEndpoint.createExecutorService(
configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
+
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
"DispatcherRestEndpoint"),
new AkkaQueryServiceRetriever(actorSystem,
timeout),
highAvailabilityServices.getWebMonitorLeaderElectionService());
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index a7840d6..74ff770 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -350,6 +350,7 @@ public class MiniCluster implements JobExecutorService,
AutoCloseableAsync {
blobServer.getTransientBlobService(),
WebMonitorEndpoint.createExecutorService(
configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1),
+
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
"DispatcherRestEndpoint"),
new AkkaQueryServiceRetriever(
actorSystem,
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 02d92dc..c480c33 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -783,11 +783,20 @@ public class WebMonitorEndpoint<T extends RestfulGateway>
extends RestServerEndp
return archivedJson;
}
- public static ExecutorService createExecutorService(int numThreads,
String componentName) {
+ public static ExecutorService createExecutorService(int numThreads, int
threadPriority, String componentName) {
+ if (threadPriority < Thread.MIN_PRIORITY || threadPriority >
Thread.MAX_PRIORITY) {
+ throw new IllegalArgumentException(
+ String.format(
+ "The thread priority must be within
(%s, %s) but it was %s.",
+ Thread.MIN_PRIORITY,
+ Thread.MAX_PRIORITY,
+ threadPriority));
+ }
+
return Executors.newFixedThreadPool(
numThreads,
new ExecutorThreadFactory.Builder()
- .setThreadPriority(Thread.MIN_PRIORITY)
+ .setThreadPriority(threadPriority)
.setPoolName("Flink-" + componentName)
.build());
}