This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 39324331979cdedb9832bf06408bb182fc9476fa Author: yanghua <[email protected]> AuthorDate: Fri Oct 12 14:05:23 2018 +0800 [FLINK-10253] Run MetricQueryService with lower priority --- docs/_includes/generated/metric_configuration.html | 5 ++ .../apache/flink/configuration/MetricOptions.java | 12 ++++ .../runtime/clusterframework/BootstrapTools.java | 4 +- .../akka/dispatch/PriorityThreadFactory.scala | 38 +++++++++++ .../akka/dispatch/PriorityThreadsDispatcher.scala | 64 ++++++++++++++++++ .../PriorityThreadsDispatcherPrerequisites.scala | 78 ++++++++++++++++++++++ .../org/apache/flink/runtime/akka/AkkaUtils.scala | 13 +++- .../apache/flink/runtime/akka/AkkaUtilsTest.scala | 40 ++++++++++- 8 files changed, 248 insertions(+), 6 deletions(-) diff --git a/docs/_includes/generated/metric_configuration.html b/docs/_includes/generated/metric_configuration.html index 5c073f3..420bb7f 100644 --- a/docs/_includes/generated/metric_configuration.html +++ b/docs/_includes/generated/metric_configuration.html @@ -13,6 +13,11 @@ <td>The port range used for Flink's internal metric query service. Accepts a list of ports (“50100,50101”), ranges(“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple Flink components are running on the same machine. Per default Flink will pick a random port.</td> </tr> <tr> + <td><h5>metrics.internal.query-service.thread-priority</h5></td> + <td style="word-wrap: break-word;">1</td> + <td>The thread priority used for Flink's internal metric query service. The thread is created by Akka's thread pool executor. The range of the priority is from 1 (MIN_PRIORITY) to 10 (MAX_PRIORITY). Warning, increasing this value may bring the main Flink components down.</td> + </tr> + <tr> <td><h5>metrics.latency.granularity</h5></td> <td style="word-wrap: break-word;">"operator"</td> <td>Defines the granularity of latency metrics. Accepted values are:<ul><li>single - Track latency without differentiating between sources and subtasks.</li><li>operator - Track latency while differentiating between sources, but not subtasks.</li><li>subtask - Track latency while differentiating between sources and subtasks.</li></ul></td> diff --git a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java index 0e7268e..0785b34 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java @@ -156,6 +156,18 @@ public class MetricOptions { "ports to avoid collisions when multiple Flink components are running on the same machine. Per default " + "Flink will pick a random port."); + /** + * The thread priority for Flink's internal metric query service. The {@code 1} means the min priority and the + * {@code 10} means the max priority. + */ + public static final ConfigOption<Integer> QUERY_SERVICE_THREAD_PRIORITY = + key("metrics.internal.query-service.thread-priority") + .defaultValue(1) + .withDescription("The thread priority used for Flink's internal metric query service. The thread is created" + + " by Akka's thread pool executor. " + + "The range of the priority is from 1 (MIN_PRIORITY) to 10 (MAX_PRIORITY). " + + "Warning, increasing this value may bring the main Flink components down."); + private MetricOptions() { } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java index 00b6173..430af98 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.MetricOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.configuration.WebOptions; import org.apache.flink.runtime.akka.AkkaUtils; @@ -290,7 +291,8 @@ public class BootstrapTools { case FORK_JOIN_EXECUTOR: return AkkaUtils.getForkJoinExecutorConfig(configuration); case FIXED_THREAD_POOL_EXECUTOR: - return AkkaUtils.getThreadPoolExecutorConfig(); + return AkkaUtils.getThreadPoolExecutorConfig( + configuration.getInteger(MetricOptions.QUERY_SERVICE_THREAD_PRIORITY)); default: throw new IllegalArgumentException(String.format("Unknown ActorSystemExecutorMode %s.", executorMode)); } diff --git a/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadFactory.scala b/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadFactory.scala new file mode 100644 index 0000000..d6f6d76 --- /dev/null +++ b/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadFactory.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package akka.dispatch + +import java.util.concurrent.ThreadFactory + +/** + * Composition over the [[DispatcherPrerequisites.threadFactory]] that set priority + * for newly created threads. + * + * @param newThreadPriority priority that will be set to each newly created thread + * should be between Thread.MIN_PRIORITY and Thread.MAX_PRIORITY + */ +class PriorityThreadFactory( + prerequisites: DispatcherPrerequisites, + newThreadPriority: Int) extends ThreadFactory { + override def newThread(r: Runnable): Thread = { + val newThread = prerequisites.threadFactory.newThread(r) + newThread.setPriority(newThreadPriority) + newThread + } +} diff --git a/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadsDispatcher.scala b/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadsDispatcher.scala new file mode 100644 index 0000000..06ef001 --- /dev/null +++ b/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadsDispatcher.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package akka.dispatch + +import com.typesafe.config.Config + +/** + * Akka Dispatcher that creates thread with configurable priority. + * + * Example of configuration: + * + * low-priority-threads-dispatcher { + * type = akka.dispatch.PriorityThreadsDispatcher + * executor = "thread-pool-executor" + * # should be between Thread.MIN_PRIORITY (which is 1) and Thread.MAX_PRIORITY (which is 10) + * threads-priority = 1 + * thread-pool-executor { + * core-pool-size-min = 0 + * core-pool-size-factor = 2.0 + * core-pool-size-max = 10 + * } + * } + * + * Two arguments constructor (the primary constructor) is automatically called by Akka + * when it founds: + * abcde-dispatcher { + * type = akka.dispatch.PriorityThreadsDispatcher <-- the class that Akka will instantiate + * ... + * } + * + * @param config passed automatically by Akka, should contains information about threads priority + * @param prerequisites passed automatically by Akka + */ +class PriorityThreadsDispatcher(config: Config, prerequisites: DispatcherPrerequisites) + extends DispatcherConfigurator( + config, + new PriorityThreadsDispatcherPrerequisites( + prerequisites, + config.getInt(PriorityThreadsDispatcher.threadPriorityConfigKey) + ) + ) + +object PriorityThreadsDispatcher { + /** + * Configuration key under which int value should be placed. + */ + val threadPriorityConfigKey = "thread-priority" +} diff --git a/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadsDispatcherPrerequisites.scala b/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadsDispatcherPrerequisites.scala new file mode 100644 index 0000000..a62f121 --- /dev/null +++ b/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadsDispatcherPrerequisites.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package akka.dispatch + +/** + * Composition over [[DefaultDispatcherPrerequisites]] that replaces thread factory with one that + * allow to configure thread priority. + * + * @param newThreadPriority priority that will be set to each newly created thread + * should be between Thread.MIN_PRIORITY and Thread.MAX_PRIORITY + */ +class PriorityThreadsDispatcherPrerequisites( + prerequisites: DispatcherPrerequisites, + newThreadPriority: Int) extends DispatcherPrerequisites { + + private val defaultDispatcherPrerequisites : DefaultDispatcherPrerequisites = + new DefaultDispatcherPrerequisites( + eventStream = prerequisites.eventStream, + scheduler = prerequisites.scheduler, + dynamicAccess = prerequisites.dynamicAccess, + settings = prerequisites.settings, + mailboxes = prerequisites.mailboxes, + defaultExecutionContext = prerequisites.defaultExecutionContext, + threadFactory = new PriorityThreadFactory(prerequisites, newThreadPriority) + ) + + override def threadFactory : java.util.concurrent.ThreadFactory = { + defaultDispatcherPrerequisites.threadFactory + } + + override def eventStream : akka.event.EventStream = { + defaultDispatcherPrerequisites.eventStream + } + + override def scheduler : akka.actor.Scheduler = { + defaultDispatcherPrerequisites.scheduler + } + + override def dynamicAccess : akka.actor.DynamicAccess = { + defaultDispatcherPrerequisites.dynamicAccess + } + + override def settings : akka.actor.ActorSystem.Settings = { + defaultDispatcherPrerequisites.settings + } + + override def mailboxes : akka.dispatch.Mailboxes = { + defaultDispatcherPrerequisites.mailboxes + } + + override def defaultExecutionContext : scala.Option[scala.concurrent.ExecutionContext] = { + defaultDispatcherPrerequisites.defaultExecutionContext + } +} + +object PriorityThreadsDispatcherPrerequisites { + def apply(prerequisites: DispatcherPrerequisites, newThreadPriority: Int): + PriorityThreadsDispatcherPrerequisites = + new PriorityThreadsDispatcherPrerequisites(prerequisites, newThreadPriority) +} + + diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index de2f35e..9c33d2a 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -26,7 +26,7 @@ import akka.actor._ import akka.pattern.{ask => akkaAsk} import com.typesafe.config.{Config, ConfigFactory} import org.apache.flink.api.common.time.Time -import org.apache.flink.configuration.{AkkaOptions, Configuration, IllegalConfigurationException, SecurityOptions} +import org.apache.flink.configuration._ import org.apache.flink.runtime.concurrent.FutureUtils import org.apache.flink.runtime.net.SSLUtils import org.apache.flink.util.NetUtils @@ -291,12 +291,21 @@ object AkkaUtils { ConfigFactory.parseString(config) } - def getThreadPoolExecutorConfig: Config = { + def getThreadPoolExecutorConfig(threadPriority: Int): Config = { + if (threadPriority < Thread.MIN_PRIORITY || threadPriority > Thread.MAX_PRIORITY) { + throw new IllegalConfigurationException("The config : " + + MetricOptions.QUERY_SERVICE_THREAD_PRIORITY.key() + "'s value must between " + + Thread.MIN_PRIORITY + " and " + Thread.MAX_PRIORITY + + ", but the value is " + threadPriority) + } + val configString = s""" |akka { | actor { | default-dispatcher { + | type = akka.dispatch.PriorityThreadsDispatcher | executor = "thread-pool-executor" + | thread-priority = $threadPriority | thread-pool-executor { | core-pool-size-min = 2 | core-pool-size-factor = 2.0 diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala index e5c1668..ae6209a 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala @@ -20,14 +20,17 @@ package org.apache.flink.runtime.akka import java.net.{InetAddress, InetSocketAddress} -import org.apache.flink.configuration.{AkkaOptions, Configuration, IllegalConfigurationException} +import org.apache.flink.configuration.{AkkaOptions, Configuration, IllegalConfigurationException, MetricOptions} import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution +import org.apache.flink.runtime.metrics.util.MetricUtils import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.AkkaProtocol import org.apache.flink.util.NetUtils +import org.junit.Assert.assertEquals import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} +import org.slf4j.LoggerFactory @RunWith(classOf[JUnitRunner]) class AkkaUtilsTest @@ -184,13 +187,44 @@ class AkkaUtilsTest } test("getAkkaConfig respects executor config") { - val akkaConfig = AkkaUtils.getAkkaConfig( + var akkaConfig = AkkaUtils.getAkkaConfig( new Configuration(), "localhost", 1234, - AkkaUtils.getThreadPoolExecutorConfig) + AkkaUtils.getThreadPoolExecutorConfig(Thread.MIN_PRIORITY)) akkaConfig.getString("akka.actor.default-dispatcher.executor") should equal("thread-pool-executor") + + akkaConfig.getInt("akka.actor.default-dispatcher.thread-priority") should + equal(Thread.MIN_PRIORITY) + + akkaConfig = AkkaUtils.getAkkaConfig( + new Configuration(), + "localhost", + 1234, + AkkaUtils.getThreadPoolExecutorConfig(Thread.MAX_PRIORITY)) + + akkaConfig.getInt("akka.actor.default-dispatcher.thread-priority") should + equal(Thread.MAX_PRIORITY) + } + + test("thread priority for metrics ActorSystem ") { + var actorSystem = MetricUtils.startMetricsActorSystem( + new Configuration, "localhost", LoggerFactory.getLogger("AkkaUtilsTest")) + //test default thread priority + val defaultThreadPriority = actorSystem.settings.config.getInt( + "akka.actor.default-dispatcher.thread-priority") + //check default value + assertEquals(Thread.MIN_PRIORITY, defaultThreadPriority) + + val config = new Configuration() + config.setInteger(MetricOptions.QUERY_SERVICE_THREAD_PRIORITY, Thread.MAX_PRIORITY) + actorSystem = MetricUtils.startMetricsActorSystem( + config, "localhost", LoggerFactory.getLogger("AkkaUtilsTest")) + val threadPriority = actorSystem.settings.config.getInt( + "akka.actor.default-dispatcher.thread-priority") + //check config value + assertEquals(Thread.MAX_PRIORITY, threadPriority) } }
