This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 39324331979cdedb9832bf06408bb182fc9476fa
Author: yanghua <[email protected]>
AuthorDate: Fri Oct 12 14:05:23 2018 +0800

    [FLINK-10253] Run MetricQueryService with lower priority
---
 docs/_includes/generated/metric_configuration.html |  5 ++
 .../apache/flink/configuration/MetricOptions.java  | 12 ++++
 .../runtime/clusterframework/BootstrapTools.java   |  4 +-
 .../akka/dispatch/PriorityThreadFactory.scala      | 38 +++++++++++
 .../akka/dispatch/PriorityThreadsDispatcher.scala  | 64 ++++++++++++++++++
 .../PriorityThreadsDispatcherPrerequisites.scala   | 78 ++++++++++++++++++++++
 .../org/apache/flink/runtime/akka/AkkaUtils.scala  | 13 +++-
 .../apache/flink/runtime/akka/AkkaUtilsTest.scala  | 40 ++++++++++-
 8 files changed, 248 insertions(+), 6 deletions(-)

diff --git a/docs/_includes/generated/metric_configuration.html 
b/docs/_includes/generated/metric_configuration.html
index 5c073f3..420bb7f 100644
--- a/docs/_includes/generated/metric_configuration.html
+++ b/docs/_includes/generated/metric_configuration.html
@@ -13,6 +13,11 @@
             <td>The port range used for Flink's internal metric query service. 
Accepts a list of ports (“50100,50101”), ranges(“50100-50200”) or a combination 
of both. It is recommended to set a range of ports to avoid collisions when 
multiple Flink components are running on the same machine. Per default Flink 
will pick a random port.</td>
         </tr>
         <tr>
+            <td><h5>metrics.internal.query-service.thread-priority</h5></td>
+            <td style="word-wrap: break-word;">1</td>
+            <td>The thread priority used for Flink's internal metric query 
service. The thread is created by Akka's thread pool executor. The range of the 
priority is from 1 (MIN_PRIORITY) to 10 (MAX_PRIORITY). Warning, increasing 
this value may bring the main Flink components down.</td>
+        </tr>
+        <tr>
             <td><h5>metrics.latency.granularity</h5></td>
             <td style="word-wrap: break-word;">"operator"</td>
             <td>Defines the granularity of latency metrics. Accepted values 
are:<ul><li>single - Track latency without differentiating between sources and 
subtasks.</li><li>operator - Track latency while differentiating between 
sources, but not subtasks.</li><li>subtask - Track latency while 
differentiating between sources and subtasks.</li></ul></td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
index 0e7268e..0785b34 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
@@ -156,6 +156,18 @@ public class MetricOptions {
                        "ports to avoid collisions when multiple Flink 
components are running on the same machine. Per default " +
                        "Flink will pick a random port.");
 
+       /**
+        * The thread priority for Flink's internal metric query service. The 
{@code 1} means the min priority and the
+        * {@code 10} means the max priority.
+        */
+       public static final ConfigOption<Integer> QUERY_SERVICE_THREAD_PRIORITY 
=
+               key("metrics.internal.query-service.thread-priority")
+               .defaultValue(1)
+               .withDescription("The thread priority used for Flink's internal 
metric query service. The thread is created" +
+                       " by Akka's thread pool executor. " +
+                       "The range of the priority is from 1 (MIN_PRIORITY) to 
10 (MAX_PRIORITY). " +
+                       "Warning, increasing this value may bring the main 
Flink components down.");
+
        private MetricOptions() {
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 00b6173..430af98 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -290,7 +291,8 @@ public class BootstrapTools {
                        case FORK_JOIN_EXECUTOR:
                                return 
AkkaUtils.getForkJoinExecutorConfig(configuration);
                        case FIXED_THREAD_POOL_EXECUTOR:
-                               return AkkaUtils.getThreadPoolExecutorConfig();
+                               return AkkaUtils.getThreadPoolExecutorConfig(
+                                       
configuration.getInteger(MetricOptions.QUERY_SERVICE_THREAD_PRIORITY));
                        default:
                                throw new 
IllegalArgumentException(String.format("Unknown ActorSystemExecutorMode %s.", 
executorMode));
                }
diff --git 
a/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadFactory.scala 
b/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadFactory.scala
new file mode 100644
index 0000000..d6f6d76
--- /dev/null
+++ b/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadFactory.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package akka.dispatch
+
+import java.util.concurrent.ThreadFactory
+
+/**
+  * Composition over the [[DispatcherPrerequisites.threadFactory]] that set 
priority
+  * for newly created threads.
+  *
+  * @param newThreadPriority priority that will be set to each newly created 
thread
+  *                          should be between Thread.MIN_PRIORITY and 
Thread.MAX_PRIORITY
+  */
+class PriorityThreadFactory(
+  prerequisites: DispatcherPrerequisites,
+  newThreadPriority: Int) extends ThreadFactory {
+    override def newThread(r: Runnable): Thread = {
+      val newThread = prerequisites.threadFactory.newThread(r)
+      newThread.setPriority(newThreadPriority)
+      newThread
+    }
+}
diff --git 
a/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadsDispatcher.scala 
b/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadsDispatcher.scala
new file mode 100644
index 0000000..06ef001
--- /dev/null
+++ b/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadsDispatcher.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package akka.dispatch
+
+import com.typesafe.config.Config
+
+/**
+  * Akka Dispatcher that creates thread with configurable priority.
+  *
+  * Example of configuration:
+  *
+  *   low-priority-threads-dispatcher {
+  *     type = akka.dispatch.PriorityThreadsDispatcher
+  *     executor = "thread-pool-executor"
+  *     # should be between Thread.MIN_PRIORITY (which is 1) and 
Thread.MAX_PRIORITY (which is 10)
+  *     threads-priority = 1
+  *     thread-pool-executor {
+  *       core-pool-size-min = 0
+  *       core-pool-size-factor = 2.0
+  *       core-pool-size-max = 10
+  *     }
+  *   }
+  *
+  * Two arguments constructor (the primary constructor) is automatically 
called by Akka
+  * when it founds:
+  *   abcde-dispatcher {
+  *     type = akka.dispatch.PriorityThreadsDispatcher <-- the class that Akka 
will instantiate
+  *     ...
+  *   }
+  *
+  * @param config passed automatically by Akka, should contains information 
about threads priority
+  * @param prerequisites passed automatically by Akka
+  */
+class PriorityThreadsDispatcher(config: Config, prerequisites: 
DispatcherPrerequisites)
+  extends DispatcherConfigurator(
+    config,
+    new PriorityThreadsDispatcherPrerequisites(
+      prerequisites,
+      config.getInt(PriorityThreadsDispatcher.threadPriorityConfigKey)
+    )
+  )
+
+object PriorityThreadsDispatcher {
+  /**
+    * Configuration key under which int value should be placed.
+    */
+  val threadPriorityConfigKey = "thread-priority"
+}
diff --git 
a/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadsDispatcherPrerequisites.scala
 
b/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadsDispatcherPrerequisites.scala
new file mode 100644
index 0000000..a62f121
--- /dev/null
+++ 
b/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadsDispatcherPrerequisites.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package akka.dispatch
+
+/**
+  * Composition over [[DefaultDispatcherPrerequisites]] that replaces thread 
factory with one that
+  * allow to configure thread priority.
+  *
+  * @param newThreadPriority priority that will be set to each newly created 
thread
+  *                          should be between Thread.MIN_PRIORITY and 
Thread.MAX_PRIORITY
+  */
+class PriorityThreadsDispatcherPrerequisites(
+  prerequisites: DispatcherPrerequisites,
+  newThreadPriority: Int) extends DispatcherPrerequisites {
+
+  private val defaultDispatcherPrerequisites : DefaultDispatcherPrerequisites =
+    new DefaultDispatcherPrerequisites(
+      eventStream = prerequisites.eventStream,
+      scheduler = prerequisites.scheduler,
+      dynamicAccess = prerequisites.dynamicAccess,
+      settings = prerequisites.settings,
+      mailboxes = prerequisites.mailboxes,
+      defaultExecutionContext = prerequisites.defaultExecutionContext,
+      threadFactory = new PriorityThreadFactory(prerequisites, 
newThreadPriority)
+  )
+
+  override def threadFactory : java.util.concurrent.ThreadFactory = {
+    defaultDispatcherPrerequisites.threadFactory
+  }
+
+  override def eventStream : akka.event.EventStream = {
+    defaultDispatcherPrerequisites.eventStream
+  }
+
+  override def scheduler : akka.actor.Scheduler = {
+    defaultDispatcherPrerequisites.scheduler
+  }
+
+  override def dynamicAccess : akka.actor.DynamicAccess = {
+    defaultDispatcherPrerequisites.dynamicAccess
+  }
+
+  override def settings : akka.actor.ActorSystem.Settings = {
+    defaultDispatcherPrerequisites.settings
+  }
+
+  override def mailboxes : akka.dispatch.Mailboxes = {
+    defaultDispatcherPrerequisites.mailboxes
+  }
+
+  override def defaultExecutionContext : 
scala.Option[scala.concurrent.ExecutionContext] = {
+    defaultDispatcherPrerequisites.defaultExecutionContext
+  }
+}
+
+object PriorityThreadsDispatcherPrerequisites {
+  def apply(prerequisites: DispatcherPrerequisites, newThreadPriority: Int):
+    PriorityThreadsDispatcherPrerequisites =
+      new PriorityThreadsDispatcherPrerequisites(prerequisites, 
newThreadPriority)
+}
+
+
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index de2f35e..9c33d2a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -26,7 +26,7 @@ import akka.actor._
 import akka.pattern.{ask => akkaAsk}
 import com.typesafe.config.{Config, ConfigFactory}
 import org.apache.flink.api.common.time.Time
-import org.apache.flink.configuration.{AkkaOptions, Configuration, 
IllegalConfigurationException, SecurityOptions}
+import org.apache.flink.configuration._
 import org.apache.flink.runtime.concurrent.FutureUtils
 import org.apache.flink.runtime.net.SSLUtils
 import org.apache.flink.util.NetUtils
@@ -291,12 +291,21 @@ object AkkaUtils {
     ConfigFactory.parseString(config)
   }
 
-  def getThreadPoolExecutorConfig: Config = {
+  def getThreadPoolExecutorConfig(threadPriority: Int): Config = {
+    if (threadPriority < Thread.MIN_PRIORITY || threadPriority > 
Thread.MAX_PRIORITY) {
+      throw new IllegalConfigurationException("The config : " +
+        MetricOptions.QUERY_SERVICE_THREAD_PRIORITY.key() + "'s value must 
between "
+        + Thread.MIN_PRIORITY + " and " + Thread.MAX_PRIORITY +
+        ", but the value is " + threadPriority)
+    }
+
     val configString = s"""
        |akka {
        |  actor {
        |    default-dispatcher {
+       |      type = akka.dispatch.PriorityThreadsDispatcher
        |      executor = "thread-pool-executor"
+       |      thread-priority = $threadPriority
        |      thread-pool-executor {
        |        core-pool-size-min = 2
        |        core-pool-size-factor = 2.0
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
index e5c1668..ae6209a 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
@@ -20,14 +20,17 @@ package org.apache.flink.runtime.akka
 
 import java.net.{InetAddress, InetSocketAddress}
 
-import org.apache.flink.configuration.{AkkaOptions, Configuration, 
IllegalConfigurationException}
+import org.apache.flink.configuration.{AkkaOptions, Configuration, 
IllegalConfigurationException, MetricOptions}
 import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution
+import org.apache.flink.runtime.metrics.util.MetricUtils
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.AkkaProtocol
 import org.apache.flink.util.NetUtils
+import org.junit.Assert.assertEquals
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
+import org.slf4j.LoggerFactory
 
 @RunWith(classOf[JUnitRunner])
 class AkkaUtilsTest
@@ -184,13 +187,44 @@ class AkkaUtilsTest
   }
 
   test("getAkkaConfig respects executor config") {
-    val akkaConfig = AkkaUtils.getAkkaConfig(
+    var akkaConfig = AkkaUtils.getAkkaConfig(
       new Configuration(),
       "localhost",
       1234,
-      AkkaUtils.getThreadPoolExecutorConfig)
+      AkkaUtils.getThreadPoolExecutorConfig(Thread.MIN_PRIORITY))
 
     akkaConfig.getString("akka.actor.default-dispatcher.executor") should
       equal("thread-pool-executor")
+
+    akkaConfig.getInt("akka.actor.default-dispatcher.thread-priority") should
+      equal(Thread.MIN_PRIORITY)
+
+    akkaConfig = AkkaUtils.getAkkaConfig(
+      new Configuration(),
+      "localhost",
+      1234,
+      AkkaUtils.getThreadPoolExecutorConfig(Thread.MAX_PRIORITY))
+
+    akkaConfig.getInt("akka.actor.default-dispatcher.thread-priority") should
+      equal(Thread.MAX_PRIORITY)
+  }
+
+  test("thread priority for metrics ActorSystem ") {
+    var actorSystem = MetricUtils.startMetricsActorSystem(
+      new Configuration, "localhost", LoggerFactory.getLogger("AkkaUtilsTest"))
+    //test default thread priority
+    val defaultThreadPriority = actorSystem.settings.config.getInt(
+      "akka.actor.default-dispatcher.thread-priority")
+    //check default value
+    assertEquals(Thread.MIN_PRIORITY, defaultThreadPriority)
+
+    val config = new Configuration()
+    config.setInteger(MetricOptions.QUERY_SERVICE_THREAD_PRIORITY, 
Thread.MAX_PRIORITY)
+    actorSystem = MetricUtils.startMetricsActorSystem(
+      config, "localhost", LoggerFactory.getLogger("AkkaUtilsTest"))
+    val threadPriority = actorSystem.settings.config.getInt(
+      "akka.actor.default-dispatcher.thread-priority")
+    //check config value
+    assertEquals(Thread.MAX_PRIORITY, threadPriority)
   }
 }

Reply via email to