This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f81297ac224f45d750ee0616755ade809b57ea4a Author: Till Rohrmann <[email protected]> AuthorDate: Wed Oct 17 00:47:22 2018 +0200 [FLINK-10253] Add ActorSystemExecutorConfiguration to configure ActorSystem's executor Add MetricUtilsTest#testStartMetricActorSystemRespectsThreadPriority This closes #6839. --- .../runtime/clusterframework/BootstrapTools.java | 150 ++++++++++++++++----- .../runtime/entrypoint/ClusterEntrypoint.java | 4 +- .../flink/runtime/metrics/util/MetricUtils.java | 4 +- .../org/apache/flink/runtime/akka/AkkaUtils.scala | 33 +++-- .../runtime/metrics/util/MetricUtilsTest.java | 60 +++++++++ .../apache/flink/runtime/akka/AkkaUtilsTest.scala | 46 ++----- 6 files changed, 208 insertions(+), 89 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java index 430af98..d95034d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java @@ -19,12 +19,12 @@ package org.apache.flink.runtime.clusterframework; import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.JobManagerOptions; -import org.apache.flink.configuration.MetricOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.configuration.WebOptions; import org.apache.flink.runtime.akka.AkkaUtils; @@ -99,7 +99,7 @@ public class BootstrapTools { listeningAddress, portRangeDefinition, logger, - ActorSystemExecutorMode.FORK_JOIN_EXECUTOR); + ForkJoinExecutorConfiguration.fromConfiguration(configuration)); } /** @@ -109,7 +109,7 @@ public class BootstrapTools { * @param listeningAddress The address to listen at. * @param portRangeDefinition The port range to choose a port from. * @param logger The logger to output log information. - * @param executorMode The executor mode of Akka actor system. + * @param actorSystemExecutorConfiguration configuration for the ActorSystem's underlying executor * @return The ActorSystem which has been started * @throws Exception Thrown when actor system cannot be started in specified port range */ @@ -118,14 +118,14 @@ public class BootstrapTools { String listeningAddress, String portRangeDefinition, Logger logger, - @Nonnull ActorSystemExecutorMode executorMode) throws Exception { + @Nonnull ActorSystemExecutorConfiguration actorSystemExecutorConfiguration) throws Exception { return startActorSystem( configuration, AkkaUtils.getFlinkActorSystemName(), listeningAddress, portRangeDefinition, logger, - executorMode); + actorSystemExecutorConfiguration); } /** @@ -136,7 +136,7 @@ public class BootstrapTools { * @param listeningAddress The address to listen at. * @param portRangeDefinition The port range to choose a port from. * @param logger The logger to output log information. - * @param executorMode The executor mode of Akka actor system. + * @param actorSystemExecutorConfiguration configuration for the ActorSystem's underlying executor * @return The ActorSystem which has been started * @throws Exception Thrown when actor system cannot be started in specified port range */ @@ -146,7 +146,7 @@ public class BootstrapTools { String listeningAddress, String portRangeDefinition, Logger logger, - @Nonnull ActorSystemExecutorMode executorMode) throws Exception { + @Nonnull ActorSystemExecutorConfiguration actorSystemExecutorConfiguration) throws Exception { // parse port range definition and create port iterator Iterator<Integer> portsIterator; @@ -178,7 +178,7 @@ public class BootstrapTools { listeningAddress, port, logger, - executorMode); + actorSystemExecutorConfiguration); } catch (Exception e) { // we can continue to try if this contains a netty channel exception @@ -210,7 +210,12 @@ public class BootstrapTools { String listeningAddress, int listeningPort, Logger logger) throws Exception { - return startActorSystem(configuration, listeningAddress, listeningPort, logger, ActorSystemExecutorMode.FORK_JOIN_EXECUTOR); + return startActorSystem( + configuration, + listeningAddress, + listeningPort, + logger, + ForkJoinExecutorConfiguration.fromConfiguration(configuration)); } /** @@ -219,7 +224,7 @@ public class BootstrapTools { * @param listeningAddress The address to listen at. * @param listeningPort The port to listen at. * @param logger the logger to output log information. - * @param executorMode The executor mode of Akka actor system. + * @param actorSystemExecutorConfiguration configuration for the ActorSystem's underlying executor * @return The ActorSystem which has been started. * @throws Exception */ @@ -228,14 +233,14 @@ public class BootstrapTools { String listeningAddress, int listeningPort, Logger logger, - ActorSystemExecutorMode executorMode) throws Exception { + ActorSystemExecutorConfiguration actorSystemExecutorConfiguration) throws Exception { return startActorSystem( configuration, AkkaUtils.getFlinkActorSystemName(), listeningAddress, listeningPort, logger, - executorMode); + actorSystemExecutorConfiguration); } /** @@ -245,7 +250,7 @@ public class BootstrapTools { * @param listeningAddress The address to listen at. * @param listeningPort The port to listen at. * @param logger the logger to output log information. - * @param executorMode The executor mode of Akka actor system. + * @param actorSystemExecutorConfiguration configuration for the ActorSystem's underlying executor * @return The ActorSystem which has been started. * @throws Exception */ @@ -255,7 +260,7 @@ public class BootstrapTools { String listeningAddress, int listeningPort, Logger logger, - ActorSystemExecutorMode executorMode) throws Exception { + ActorSystemExecutorConfiguration actorSystemExecutorConfiguration) throws Exception { String hostPortUrl = NetUtils.unresolvedHostAndPortToNormalizedString(listeningAddress, listeningPort); logger.info("Trying to start actor system at {}", hostPortUrl); @@ -264,8 +269,7 @@ public class BootstrapTools { Config akkaConfig = AkkaUtils.getAkkaConfig( configuration, new Some<>(new Tuple2<>(listeningAddress, listeningPort)), - getExecutorConfigByExecutorMode(configuration, executorMode) - ); + actorSystemExecutorConfiguration.getAkkaConfig()); logger.debug("Using akka configuration\n {}", akkaConfig); @@ -286,18 +290,6 @@ public class BootstrapTools { } } - private static Config getExecutorConfigByExecutorMode(Configuration configuration, ActorSystemExecutorMode executorMode) { - switch (executorMode) { - case FORK_JOIN_EXECUTOR: - return AkkaUtils.getForkJoinExecutorConfig(configuration); - case FIXED_THREAD_POOL_EXECUTOR: - return AkkaUtils.getThreadPoolExecutorConfig( - configuration.getInteger(MetricOptions.QUERY_SERVICE_THREAD_PRIORITY)); - default: - throw new IllegalArgumentException(String.format("Unknown ActorSystemExecutorMode %s.", executorMode)); - } - } - /** * Starts the web frontend. * @@ -627,12 +619,102 @@ public class BootstrapTools { } /** - * Options to specify which executor to use in an {@link ActorSystem}. + * Configuration interface for {@link ActorSystem} underlying executor. + */ + interface ActorSystemExecutorConfiguration { + + /** + * Create the executor {@link Config} for the respective executor. + * + * @return Akka config for the respective executor + */ + Config getAkkaConfig(); + } + + /** + * Configuration for a fork join executor. */ - public enum ActorSystemExecutorMode { - /** Used by default, use dispatcher with fork-join-executor. **/ - FORK_JOIN_EXECUTOR, - /** Use dispatcher with fixed thread pool executor. **/ - FIXED_THREAD_POOL_EXECUTOR + public static class ForkJoinExecutorConfiguration implements ActorSystemExecutorConfiguration { + + private final double parallelismFactor; + + private final int minParallelism; + + private final int maxParallelism; + + public ForkJoinExecutorConfiguration(double parallelismFactor, int minParallelism, int maxParallelism) { + this.parallelismFactor = parallelismFactor; + this.minParallelism = minParallelism; + this.maxParallelism = maxParallelism; + } + + public double getParallelismFactor() { + return parallelismFactor; + } + + public int getMinParallelism() { + return minParallelism; + } + + public int getMaxParallelism() { + return maxParallelism; + } + + @Override + public Config getAkkaConfig() { + return AkkaUtils.getForkJoinExecutorConfig(this); + } + + public static ForkJoinExecutorConfiguration fromConfiguration(final Configuration configuration) { + final double parallelismFactor = configuration.getDouble(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR); + final int minParallelism = configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MIN); + final int maxParallelism = configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MAX); + + return new ForkJoinExecutorConfiguration(parallelismFactor, minParallelism, maxParallelism); + } + } + + /** + * Configuration for a fixed thread pool executor. + */ + public static class FixedThreadPoolExecutorConfiguration implements ActorSystemExecutorConfiguration { + + private final int minNumThreads; + + private final int maxNumThreads; + + private final int threadPriority; + + public FixedThreadPoolExecutorConfiguration(int minNumThreads, int maxNumThreads, int threadPriority) { + if (threadPriority < Thread.MIN_PRIORITY || threadPriority > Thread.MAX_PRIORITY) { + throw new IllegalArgumentException( + String.format( + "The thread priority must be within (%s, %s) but it was %s.", + Thread.MIN_PRIORITY, + Thread.MAX_PRIORITY, + threadPriority)); + } + + this.minNumThreads = minNumThreads; + this.maxNumThreads = maxNumThreads; + this.threadPriority = threadPriority; + } + + public int getMinNumThreads() { + return minNumThreads; + } + + public int getMaxNumThreads() { + return maxNumThreads; + } + + public int getThreadPriority() { + return threadPriority; + } + + @Override + public Config getAkkaConfig() { + return AkkaUtils.getThreadPoolExecutorConfig(this); + } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index f528bc4..f51bf69 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -85,8 +85,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import scala.concurrent.duration.FiniteDuration; -import static org.apache.flink.runtime.clusterframework.BootstrapTools.ActorSystemExecutorMode.FORK_JOIN_EXECUTOR; - /** * Base class for the Flink cluster entry points. * @@ -297,7 +295,7 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro Configuration configuration, String bindAddress, String portRange) throws Exception { - ActorSystem actorSystem = BootstrapTools.startActorSystem(configuration, bindAddress, portRange, LOG, FORK_JOIN_EXECUTOR); + ActorSystem actorSystem = BootstrapTools.startActorSystem(configuration, bindAddress, portRange, LOG); FiniteDuration duration = AkkaUtils.getTimeout(configuration); return new AkkaRpcService(actorSystem, Time.of(duration.length(), duration.unit())); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java index ba04af9..6e6f492 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java @@ -52,7 +52,6 @@ import java.lang.management.ThreadMXBean; import java.util.List; import java.util.Optional; -import static org.apache.flink.runtime.clusterframework.BootstrapTools.ActorSystemExecutorMode.FIXED_THREAD_POOL_EXECUTOR; import static org.apache.flink.runtime.metrics.util.SystemResourcesMetricsInitializer.instantiateSystemMetrics; /** @@ -123,13 +122,14 @@ public class MetricUtils { public static ActorSystem startMetricsActorSystem(Configuration configuration, String hostname, Logger logger) throws Exception { final String portRange = configuration.getString(MetricOptions.QUERY_SERVICE_PORT); + final int threadPriority = configuration.getInteger(MetricOptions.QUERY_SERVICE_THREAD_PRIORITY); return BootstrapTools.startActorSystem( configuration, METRICS_ACTOR_SYSTEM_NAME, hostname, portRange, logger, - FIXED_THREAD_POOL_EXECUTOR); + new BootstrapTools.FixedThreadPoolExecutorConfiguration(1, 1, threadPriority)); } private static void instantiateNetworkMetrics( diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index 9c33d2a..24448c7 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -27,6 +27,7 @@ import akka.pattern.{ask => akkaAsk} import com.typesafe.config.{Config, ConfigFactory} import org.apache.flink.api.common.time.Time import org.apache.flink.configuration._ +import org.apache.flink.runtime.clusterframework.BootstrapTools.{FixedThreadPoolExecutorConfiguration, ForkJoinExecutorConfiguration} import org.apache.flink.runtime.concurrent.FutureUtils import org.apache.flink.runtime.net.SSLUtils import org.apache.flink.util.NetUtils @@ -187,7 +188,10 @@ object AkkaUtils { @throws(classOf[UnknownHostException]) def getAkkaConfig(configuration: Configuration, externalAddress: Option[(String, Int)]): Config = { - getAkkaConfig(configuration, externalAddress, getForkJoinExecutorConfig(configuration)) + getAkkaConfig( + configuration, + externalAddress, + getForkJoinExecutorConfig(ForkJoinExecutorConfiguration.fromConfiguration(configuration))) } /** @@ -291,13 +295,10 @@ object AkkaUtils { ConfigFactory.parseString(config) } - def getThreadPoolExecutorConfig(threadPriority: Int): Config = { - if (threadPriority < Thread.MIN_PRIORITY || threadPriority > Thread.MAX_PRIORITY) { - throw new IllegalConfigurationException("The config : " + - MetricOptions.QUERY_SERVICE_THREAD_PRIORITY.key() + "'s value must between " - + Thread.MIN_PRIORITY + " and " + Thread.MAX_PRIORITY + - ", but the value is " + threadPriority) - } + def getThreadPoolExecutorConfig(configuration: FixedThreadPoolExecutorConfiguration): Config = { + val threadPriority = configuration.getThreadPriority + val minNumThreads = configuration.getMinNumThreads + val maxNumThreads = configuration.getMaxNumThreads val configString = s""" |akka { @@ -307,9 +308,8 @@ object AkkaUtils { | executor = "thread-pool-executor" | thread-priority = $threadPriority | thread-pool-executor { - | core-pool-size-min = 2 - | core-pool-size-factor = 2.0 - | core-pool-size-max = 4 + | core-pool-size-min = $minNumThreads + | core-pool-size-max = $maxNumThreads | } | } | } @@ -320,15 +320,12 @@ object AkkaUtils { ConfigFactory.parseString(configString) } - def getForkJoinExecutorConfig(configuration: Configuration): Config = { - val forkJoinExecutorParallelismFactor = - configuration.getDouble(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR) + def getForkJoinExecutorConfig(configuration: ForkJoinExecutorConfiguration): Config = { + val forkJoinExecutorParallelismFactor = configuration.getParallelismFactor - val forkJoinExecutorParallelismMin = - configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MIN) + val forkJoinExecutorParallelismMin = configuration.getMinParallelism - val forkJoinExecutorParallelismMax = - configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MAX) + val forkJoinExecutorParallelismMax = configuration.getMaxParallelism val configString = s""" |akka { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java new file mode 100644 index 0000000..bea0f2a --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MetricOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.util.TestLogger; + +import akka.actor.ActorSystem; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +/** + * Tests for the {@link MetricUtils} class. + */ +public class MetricUtilsTest extends TestLogger { + + private static final Logger LOG = LoggerFactory.getLogger(MetricUtilsTest.class); + + /** + * Tests that the {@link MetricUtils#startMetricsActorSystem(Configuration, String, Logger)} respects + * the given {@link MetricOptions#QUERY_SERVICE_THREAD_PRIORITY}. + */ + @Test + public void testStartMetricActorSystemRespectsThreadPriority() throws Exception { + final Configuration configuration = new Configuration(); + final int expectedThreadPriority = 3; + configuration.setInteger(MetricOptions.QUERY_SERVICE_THREAD_PRIORITY, expectedThreadPriority); + final ActorSystem actorSystem = MetricUtils.startMetricsActorSystem(configuration, "localhost", LOG); + + try { + final int threadPriority = actorSystem.settings().config().getInt("akka.actor.default-dispatcher.thread-priority"); + + assertThat(threadPriority, is(expectedThreadPriority)); + } finally { + AkkaUtils.terminateActorSystem(actorSystem).get(); + } + } +} diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala index ae6209a..c051f7a 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala @@ -21,6 +21,7 @@ package org.apache.flink.runtime.akka import java.net.{InetAddress, InetSocketAddress} import org.apache.flink.configuration.{AkkaOptions, Configuration, IllegalConfigurationException, MetricOptions} +import org.apache.flink.runtime.clusterframework.BootstrapTools.FixedThreadPoolExecutorConfiguration import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution import org.apache.flink.runtime.metrics.util.MetricUtils import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils @@ -186,45 +187,26 @@ class AkkaUtilsTest equal("fork-join-executor") } - test("getAkkaConfig respects executor config") { - var akkaConfig = AkkaUtils.getAkkaConfig( + test("getAkkaConfig sets executor with thread priority") { + val threadPriority = 3 + val minThreads = 1 + val maxThreads = 3 + val akkaConfig = AkkaUtils.getAkkaConfig( new Configuration(), "localhost", 1234, - AkkaUtils.getThreadPoolExecutorConfig(Thread.MIN_PRIORITY)) + AkkaUtils.getThreadPoolExecutorConfig( + new FixedThreadPoolExecutorConfiguration(minThreads, maxThreads, threadPriority) + )) akkaConfig.getString("akka.actor.default-dispatcher.executor") should equal("thread-pool-executor") akkaConfig.getInt("akka.actor.default-dispatcher.thread-priority") should - equal(Thread.MIN_PRIORITY) - - akkaConfig = AkkaUtils.getAkkaConfig( - new Configuration(), - "localhost", - 1234, - AkkaUtils.getThreadPoolExecutorConfig(Thread.MAX_PRIORITY)) - - akkaConfig.getInt("akka.actor.default-dispatcher.thread-priority") should - equal(Thread.MAX_PRIORITY) - } - - test("thread priority for metrics ActorSystem ") { - var actorSystem = MetricUtils.startMetricsActorSystem( - new Configuration, "localhost", LoggerFactory.getLogger("AkkaUtilsTest")) - //test default thread priority - val defaultThreadPriority = actorSystem.settings.config.getInt( - "akka.actor.default-dispatcher.thread-priority") - //check default value - assertEquals(Thread.MIN_PRIORITY, defaultThreadPriority) - - val config = new Configuration() - config.setInteger(MetricOptions.QUERY_SERVICE_THREAD_PRIORITY, Thread.MAX_PRIORITY) - actorSystem = MetricUtils.startMetricsActorSystem( - config, "localhost", LoggerFactory.getLogger("AkkaUtilsTest")) - val threadPriority = actorSystem.settings.config.getInt( - "akka.actor.default-dispatcher.thread-priority") - //check config value - assertEquals(Thread.MAX_PRIORITY, threadPriority) + equal(threadPriority) + akkaConfig.getInt("akka.actor.default-dispatcher.thread-pool-executor.core-pool-size-min") + .should(equal(minThreads)) + akkaConfig.getInt("akka.actor.default-dispatcher.thread-pool-executor.core-pool-size-max") + .should(equal(maxThreads)) } }
