This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.6 by this push:
new 967b31b [FLINK-10582][rest] Make REST executor's thread priority
configurable
967b31b is described below
commit 967b31b333e6f4b014ea3041f420bfaff2484618
Author: Till Rohrmann <[email protected]>
AuthorDate: Wed Oct 17 16:44:05 2018 +0200
[FLINK-10582][rest] Make REST executor's thread priority configurable
Introduce RestOptions#SERVER_THREAD_PRIORITY("rest.server.thread-priority")
to configure the
thread priority of the REST executor's threads.
---
docs/_includes/generated/rest_configuration.html | 5 +++++
.../java/org/apache/flink/configuration/RestOptions.java | 9 +++++++++
.../apache/flink/runtime/entrypoint/ClusterEntrypoint.java | 1 +
.../org/apache/flink/runtime/minicluster/MiniCluster.java | 1 +
.../apache/flink/runtime/webmonitor/WebMonitorEndpoint.java | 13 +++++++++++--
5 files changed, 27 insertions(+), 2 deletions(-)
diff --git a/docs/_includes/generated/rest_configuration.html
b/docs/_includes/generated/rest_configuration.html
index 1aa963f..2c5f539 100644
--- a/docs/_includes/generated/rest_configuration.html
+++ b/docs/_includes/generated/rest_configuration.html
@@ -62,5 +62,10 @@
<td style="word-wrap: break-word;">4</td>
<td>The number of threads for the asynchronous processing of
requests.</td>
</tr>
+ <tr>
+ <td><h5>rest.server.thread-priority</h5></td>
+ <td style="word-wrap: break-word;">5</td>
+ <td>Thread priority of the REST server's executor for processing
asynchronous requests. Lowering the thread priority will give Flink's main
components more CPU time whereas increasing will allocate more time for the
REST server's processing.</td>
+ </tr>
</tbody>
</table>
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
index edfd39b..11c38de 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
@@ -19,6 +19,7 @@
package org.apache.flink.configuration;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.description.Description;
import static org.apache.flink.configuration.ConfigOptions.key;
@@ -121,4 +122,12 @@ public class RestOptions {
key("rest.server.numThreads")
.defaultValue(4)
.withDescription("The number of threads for the
asynchronous processing of requests.");
+
+ public static final ConfigOption<Integer> SERVER_THREAD_PRIORITY =
key("rest.server.thread-priority")
+ .defaultValue(Thread.NORM_PRIORITY)
+ .withDescription(Description.builder()
+ .text("Thread priority of the REST server's executor
for processing asynchronous requests. " +
+ "Lowering the thread priority will give Flink's
main components more CPU time whereas " +
+ "increasing will allocate more time for the
REST server's processing.")
+ .build());
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index fd0a0a1..4f575ce 100755
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -331,6 +331,7 @@ public abstract class ClusterEntrypoint implements
FatalErrorHandler {
transientBlobCache,
WebMonitorEndpoint.createExecutorService(
configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
+
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
"DispatcherRestEndpoint"),
new AkkaQueryServiceRetriever(actorSystem,
timeout),
highAvailabilityServices.getWebMonitorLeaderElectionService());
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 4bfdb25..1d2c8c7 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -348,6 +348,7 @@ public class MiniCluster implements JobExecutorService,
AutoCloseableAsync {
blobServer.getTransientBlobService(),
WebMonitorEndpoint.createExecutorService(
configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1),
+
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
"DispatcherRestEndpoint"),
new AkkaQueryServiceRetriever(
actorSystem,
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 02d92dc..c480c33 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -783,11 +783,20 @@ public class WebMonitorEndpoint<T extends RestfulGateway>
extends RestServerEndp
return archivedJson;
}
- public static ExecutorService createExecutorService(int numThreads,
String componentName) {
+ public static ExecutorService createExecutorService(int numThreads, int
threadPriority, String componentName) {
+ if (threadPriority < Thread.MIN_PRIORITY || threadPriority >
Thread.MAX_PRIORITY) {
+ throw new IllegalArgumentException(
+ String.format(
+ "The thread priority must be within
(%s, %s) but it was %s.",
+ Thread.MIN_PRIORITY,
+ Thread.MAX_PRIORITY,
+ threadPriority));
+ }
+
return Executors.newFixedThreadPool(
numThreads,
new ExecutorThreadFactory.Builder()
- .setThreadPriority(Thread.MIN_PRIORITY)
+ .setThreadPriority(threadPriority)
.setPoolName("Flink-" + componentName)
.build());
}