This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f81297ac224f45d750ee0616755ade809b57ea4a
Author: Till Rohrmann <[email protected]>
AuthorDate: Wed Oct 17 00:47:22 2018 +0200

    [FLINK-10253] Add ActorSystemExecutorConfiguration to configure 
ActorSystem's executor
    
    Add MetricUtilsTest#testStartMetricActorSystemRespectsThreadPriority
    
    This closes #6839.
---
 .../runtime/clusterframework/BootstrapTools.java   | 150 ++++++++++++++++-----
 .../runtime/entrypoint/ClusterEntrypoint.java      |   4 +-
 .../flink/runtime/metrics/util/MetricUtils.java    |   4 +-
 .../org/apache/flink/runtime/akka/AkkaUtils.scala  |  33 +++--
 .../runtime/metrics/util/MetricUtilsTest.java      |  60 +++++++++
 .../apache/flink/runtime/akka/AkkaUtilsTest.scala  |  46 ++-----
 6 files changed, 208 insertions(+), 89 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 430af98..d95034d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -19,12 +19,12 @@
 package org.apache.flink.runtime.clusterframework;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -99,7 +99,7 @@ public class BootstrapTools {
                        listeningAddress,
                        portRangeDefinition,
                        logger,
-                       ActorSystemExecutorMode.FORK_JOIN_EXECUTOR);
+                       
ForkJoinExecutorConfiguration.fromConfiguration(configuration));
        }
 
        /**
@@ -109,7 +109,7 @@ public class BootstrapTools {
         * @param listeningAddress The address to listen at.
         * @param portRangeDefinition The port range to choose a port from.
         * @param logger The logger to output log information.
-        * @param executorMode The executor mode of Akka actor system.
+        * @param actorSystemExecutorConfiguration configuration for the 
ActorSystem's underlying executor
         * @return The ActorSystem which has been started
         * @throws Exception Thrown when actor system cannot be started in 
specified port range
         */
@@ -118,14 +118,14 @@ public class BootstrapTools {
                        String listeningAddress,
                        String portRangeDefinition,
                        Logger logger,
-                       @Nonnull ActorSystemExecutorMode executorMode) throws 
Exception {
+                       @Nonnull ActorSystemExecutorConfiguration 
actorSystemExecutorConfiguration) throws Exception {
                return startActorSystem(
                        configuration,
                        AkkaUtils.getFlinkActorSystemName(),
                        listeningAddress,
                        portRangeDefinition,
                        logger,
-                       executorMode);
+                       actorSystemExecutorConfiguration);
        }
 
        /**
@@ -136,7 +136,7 @@ public class BootstrapTools {
         * @param listeningAddress The address to listen at.
         * @param portRangeDefinition The port range to choose a port from.
         * @param logger The logger to output log information.
-        * @param executorMode The executor mode of Akka actor system.
+        * @param actorSystemExecutorConfiguration configuration for the 
ActorSystem's underlying executor
         * @return The ActorSystem which has been started
         * @throws Exception Thrown when actor system cannot be started in 
specified port range
         */
@@ -146,7 +146,7 @@ public class BootstrapTools {
                        String listeningAddress,
                        String portRangeDefinition,
                        Logger logger,
-                       @Nonnull ActorSystemExecutorMode executorMode) throws 
Exception {
+                       @Nonnull ActorSystemExecutorConfiguration 
actorSystemExecutorConfiguration) throws Exception {
 
                // parse port range definition and create port iterator
                Iterator<Integer> portsIterator;
@@ -178,7 +178,7 @@ public class BootstrapTools {
                                        listeningAddress,
                                        port,
                                        logger,
-                                       executorMode);
+                                       actorSystemExecutorConfiguration);
                        }
                        catch (Exception e) {
                                // we can continue to try if this contains a 
netty channel exception
@@ -210,7 +210,12 @@ public class BootstrapTools {
                String listeningAddress,
                int listeningPort,
                Logger logger) throws Exception {
-               return startActorSystem(configuration, listeningAddress, 
listeningPort, logger, ActorSystemExecutorMode.FORK_JOIN_EXECUTOR);
+               return startActorSystem(
+                       configuration,
+                       listeningAddress,
+                       listeningPort,
+                       logger,
+                       
ForkJoinExecutorConfiguration.fromConfiguration(configuration));
        }
 
        /**
@@ -219,7 +224,7 @@ public class BootstrapTools {
         * @param listeningAddress The address to listen at.
         * @param listeningPort The port to listen at.
         * @param logger the logger to output log information.
-        * @param executorMode The executor mode of Akka actor system.
+        * @param actorSystemExecutorConfiguration configuration for the 
ActorSystem's underlying executor
         * @return The ActorSystem which has been started.
         * @throws Exception
         */
@@ -228,14 +233,14 @@ public class BootstrapTools {
                                String listeningAddress,
                                int listeningPort,
                                Logger logger,
-                               ActorSystemExecutorMode executorMode) throws 
Exception {
+                               ActorSystemExecutorConfiguration 
actorSystemExecutorConfiguration) throws Exception {
                return startActorSystem(
                        configuration,
                        AkkaUtils.getFlinkActorSystemName(),
                        listeningAddress,
                        listeningPort,
                        logger,
-                       executorMode);
+                       actorSystemExecutorConfiguration);
        }
 
        /**
@@ -245,7 +250,7 @@ public class BootstrapTools {
         * @param listeningAddress The address to listen at.
         * @param listeningPort The port to listen at.
         * @param logger the logger to output log information.
-        * @param executorMode The executor mode of Akka actor system.
+        * @param actorSystemExecutorConfiguration configuration for the 
ActorSystem's underlying executor
         * @return The ActorSystem which has been started.
         * @throws Exception
         */
@@ -255,7 +260,7 @@ public class BootstrapTools {
                String listeningAddress,
                int listeningPort,
                Logger logger,
-               ActorSystemExecutorMode executorMode) throws Exception {
+               ActorSystemExecutorConfiguration 
actorSystemExecutorConfiguration) throws Exception {
 
                String hostPortUrl = 
NetUtils.unresolvedHostAndPortToNormalizedString(listeningAddress, 
listeningPort);
                logger.info("Trying to start actor system at {}", hostPortUrl);
@@ -264,8 +269,7 @@ public class BootstrapTools {
                        Config akkaConfig = AkkaUtils.getAkkaConfig(
                                configuration,
                                new Some<>(new Tuple2<>(listeningAddress, 
listeningPort)),
-                               getExecutorConfigByExecutorMode(configuration, 
executorMode)
-                       );
+                               
actorSystemExecutorConfiguration.getAkkaConfig());
 
                        logger.debug("Using akka configuration\n {}", 
akkaConfig);
 
@@ -286,18 +290,6 @@ public class BootstrapTools {
                }
        }
 
-       private static Config getExecutorConfigByExecutorMode(Configuration 
configuration, ActorSystemExecutorMode executorMode) {
-               switch (executorMode) {
-                       case FORK_JOIN_EXECUTOR:
-                               return 
AkkaUtils.getForkJoinExecutorConfig(configuration);
-                       case FIXED_THREAD_POOL_EXECUTOR:
-                               return AkkaUtils.getThreadPoolExecutorConfig(
-                                       
configuration.getInteger(MetricOptions.QUERY_SERVICE_THREAD_PRIORITY));
-                       default:
-                               throw new 
IllegalArgumentException(String.format("Unknown ActorSystemExecutorMode %s.", 
executorMode));
-               }
-       }
-
        /**
         * Starts the web frontend.
         *
@@ -627,12 +619,102 @@ public class BootstrapTools {
        }
 
        /**
-        * Options to specify which executor to use in an {@link ActorSystem}.
+        * Configuration interface for {@link ActorSystem} underlying executor.
+        */
+       interface ActorSystemExecutorConfiguration {
+
+               /**
+                * Create the executor {@link Config} for the respective 
executor.
+                *
+                * @return Akka config for the respective executor
+                */
+               Config getAkkaConfig();
+       }
+
+       /**
+        * Configuration for a fork join executor.
         */
-       public enum ActorSystemExecutorMode {
-               /** Used by default, use dispatcher with fork-join-executor. **/
-               FORK_JOIN_EXECUTOR,
-               /** Use dispatcher with fixed thread pool executor. **/
-               FIXED_THREAD_POOL_EXECUTOR
+       public static class ForkJoinExecutorConfiguration implements 
ActorSystemExecutorConfiguration {
+
+               private final double parallelismFactor;
+
+               private final int minParallelism;
+
+               private final int maxParallelism;
+
+               public ForkJoinExecutorConfiguration(double parallelismFactor, 
int minParallelism, int maxParallelism) {
+                       this.parallelismFactor = parallelismFactor;
+                       this.minParallelism = minParallelism;
+                       this.maxParallelism = maxParallelism;
+               }
+
+               public double getParallelismFactor() {
+                       return parallelismFactor;
+               }
+
+               public int getMinParallelism() {
+                       return minParallelism;
+               }
+
+               public int getMaxParallelism() {
+                       return maxParallelism;
+               }
+
+               @Override
+               public Config getAkkaConfig() {
+                       return AkkaUtils.getForkJoinExecutorConfig(this);
+               }
+
+               public static ForkJoinExecutorConfiguration 
fromConfiguration(final Configuration configuration) {
+                       final double parallelismFactor = 
configuration.getDouble(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR);
+                       final int minParallelism = 
configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MIN);
+                       final int maxParallelism = 
configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MAX);
+
+                       return new 
ForkJoinExecutorConfiguration(parallelismFactor, minParallelism, 
maxParallelism);
+               }
+       }
+
+       /**
+        * Configuration for a fixed thread pool executor.
+        */
+       public static class FixedThreadPoolExecutorConfiguration implements 
ActorSystemExecutorConfiguration {
+
+               private final int minNumThreads;
+
+               private final int maxNumThreads;
+
+               private final int threadPriority;
+
+               public FixedThreadPoolExecutorConfiguration(int minNumThreads, 
int maxNumThreads, int threadPriority) {
+                       if (threadPriority < Thread.MIN_PRIORITY || 
threadPriority > Thread.MAX_PRIORITY) {
+                               throw new IllegalArgumentException(
+                                       String.format(
+                                               "The thread priority must be 
within (%s, %s) but it was %s.",
+                                               Thread.MIN_PRIORITY,
+                                               Thread.MAX_PRIORITY,
+                                               threadPriority));
+                       }
+
+                       this.minNumThreads = minNumThreads;
+                       this.maxNumThreads = maxNumThreads;
+                       this.threadPriority = threadPriority;
+               }
+
+               public int getMinNumThreads() {
+                       return minNumThreads;
+               }
+
+               public int getMaxNumThreads() {
+                       return maxNumThreads;
+               }
+
+               public int getThreadPriority() {
+                       return threadPriority;
+               }
+
+               @Override
+               public Config getAkkaConfig() {
+                       return AkkaUtils.getThreadPoolExecutorConfig(this);
+               }
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index f528bc4..f51bf69 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -85,8 +85,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import scala.concurrent.duration.FiniteDuration;
 
-import static 
org.apache.flink.runtime.clusterframework.BootstrapTools.ActorSystemExecutorMode.FORK_JOIN_EXECUTOR;
-
 /**
  * Base class for the Flink cluster entry points.
  *
@@ -297,7 +295,7 @@ public abstract class ClusterEntrypoint implements 
AutoCloseableAsync, FatalErro
                        Configuration configuration,
                        String bindAddress,
                        String portRange) throws Exception {
-               ActorSystem actorSystem = 
BootstrapTools.startActorSystem(configuration, bindAddress, portRange, LOG, 
FORK_JOIN_EXECUTOR);
+               ActorSystem actorSystem = 
BootstrapTools.startActorSystem(configuration, bindAddress, portRange, LOG);
                FiniteDuration duration = AkkaUtils.getTimeout(configuration);
                return new AkkaRpcService(actorSystem, 
Time.of(duration.length(), duration.unit()));
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
index ba04af9..6e6f492 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
@@ -52,7 +52,6 @@ import java.lang.management.ThreadMXBean;
 import java.util.List;
 import java.util.Optional;
 
-import static 
org.apache.flink.runtime.clusterframework.BootstrapTools.ActorSystemExecutorMode.FIXED_THREAD_POOL_EXECUTOR;
 import static 
org.apache.flink.runtime.metrics.util.SystemResourcesMetricsInitializer.instantiateSystemMetrics;
 
 /**
@@ -123,13 +122,14 @@ public class MetricUtils {
 
        public static ActorSystem startMetricsActorSystem(Configuration 
configuration, String hostname, Logger logger) throws Exception {
                final String portRange = 
configuration.getString(MetricOptions.QUERY_SERVICE_PORT);
+               final int threadPriority = 
configuration.getInteger(MetricOptions.QUERY_SERVICE_THREAD_PRIORITY);
                return BootstrapTools.startActorSystem(
                        configuration,
                        METRICS_ACTOR_SYSTEM_NAME,
                        hostname,
                        portRange,
                        logger,
-                       FIXED_THREAD_POOL_EXECUTOR);
+                       new 
BootstrapTools.FixedThreadPoolExecutorConfiguration(1, 1, threadPriority));
        }
 
        private static void instantiateNetworkMetrics(
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 9c33d2a..24448c7 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -27,6 +27,7 @@ import akka.pattern.{ask => akkaAsk}
 import com.typesafe.config.{Config, ConfigFactory}
 import org.apache.flink.api.common.time.Time
 import org.apache.flink.configuration._
+import 
org.apache.flink.runtime.clusterframework.BootstrapTools.{FixedThreadPoolExecutorConfiguration,
 ForkJoinExecutorConfiguration}
 import org.apache.flink.runtime.concurrent.FutureUtils
 import org.apache.flink.runtime.net.SSLUtils
 import org.apache.flink.util.NetUtils
@@ -187,7 +188,10 @@ object AkkaUtils {
   @throws(classOf[UnknownHostException])
   def getAkkaConfig(configuration: Configuration,
                     externalAddress: Option[(String, Int)]): Config = {
-    getAkkaConfig(configuration, externalAddress, 
getForkJoinExecutorConfig(configuration))
+    getAkkaConfig(
+      configuration,
+      externalAddress,
+      
getForkJoinExecutorConfig(ForkJoinExecutorConfiguration.fromConfiguration(configuration)))
   }
 
   /**
@@ -291,13 +295,10 @@ object AkkaUtils {
     ConfigFactory.parseString(config)
   }
 
-  def getThreadPoolExecutorConfig(threadPriority: Int): Config = {
-    if (threadPriority < Thread.MIN_PRIORITY || threadPriority > 
Thread.MAX_PRIORITY) {
-      throw new IllegalConfigurationException("The config : " +
-        MetricOptions.QUERY_SERVICE_THREAD_PRIORITY.key() + "'s value must 
between "
-        + Thread.MIN_PRIORITY + " and " + Thread.MAX_PRIORITY +
-        ", but the value is " + threadPriority)
-    }
+  def getThreadPoolExecutorConfig(configuration: 
FixedThreadPoolExecutorConfiguration): Config = {
+    val threadPriority = configuration.getThreadPriority
+    val minNumThreads = configuration.getMinNumThreads
+    val maxNumThreads = configuration.getMaxNumThreads
 
     val configString = s"""
        |akka {
@@ -307,9 +308,8 @@ object AkkaUtils {
        |      executor = "thread-pool-executor"
        |      thread-priority = $threadPriority
        |      thread-pool-executor {
-       |        core-pool-size-min = 2
-       |        core-pool-size-factor = 2.0
-       |        core-pool-size-max = 4
+       |        core-pool-size-min = $minNumThreads
+       |        core-pool-size-max = $maxNumThreads
        |      }
        |    }
        |  }
@@ -320,15 +320,12 @@ object AkkaUtils {
     ConfigFactory.parseString(configString)
   }
 
-  def getForkJoinExecutorConfig(configuration: Configuration): Config = {
-    val forkJoinExecutorParallelismFactor =
-      
configuration.getDouble(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR)
+  def getForkJoinExecutorConfig(configuration: ForkJoinExecutorConfiguration): 
Config = {
+    val forkJoinExecutorParallelismFactor = configuration.getParallelismFactor
 
-    val forkJoinExecutorParallelismMin =
-      configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MIN)
+    val forkJoinExecutorParallelismMin = configuration.getMinParallelism
 
-    val forkJoinExecutorParallelismMax =
-      configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MAX)
+    val forkJoinExecutorParallelismMax = configuration.getMaxParallelism
 
     val configString = s"""
        |akka {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java
new file mode 100644
index 0000000..bea0f2a
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorSystem;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link MetricUtils} class.
+ */
+public class MetricUtilsTest extends TestLogger {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(MetricUtilsTest.class);
+
+       /**
+        * Tests that the {@link 
MetricUtils#startMetricsActorSystem(Configuration, String, Logger)} respects
+        * the given {@link MetricOptions#QUERY_SERVICE_THREAD_PRIORITY}.
+        */
+       @Test
+       public void testStartMetricActorSystemRespectsThreadPriority() throws 
Exception {
+               final Configuration configuration = new Configuration();
+               final int expectedThreadPriority = 3;
+               
configuration.setInteger(MetricOptions.QUERY_SERVICE_THREAD_PRIORITY, 
expectedThreadPriority);
+               final ActorSystem actorSystem = 
MetricUtils.startMetricsActorSystem(configuration, "localhost", LOG);
+
+               try {
+                       final int threadPriority = 
actorSystem.settings().config().getInt("akka.actor.default-dispatcher.thread-priority");
+
+                       assertThat(threadPriority, is(expectedThreadPriority));
+               } finally {
+                       AkkaUtils.terminateActorSystem(actorSystem).get();
+               }
+       }
+}
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
index ae6209a..c051f7a 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.akka
 import java.net.{InetAddress, InetSocketAddress}
 
 import org.apache.flink.configuration.{AkkaOptions, Configuration, 
IllegalConfigurationException, MetricOptions}
+import 
org.apache.flink.runtime.clusterframework.BootstrapTools.FixedThreadPoolExecutorConfiguration
 import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution
 import org.apache.flink.runtime.metrics.util.MetricUtils
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils
@@ -186,45 +187,26 @@ class AkkaUtilsTest
       equal("fork-join-executor")
   }
 
-  test("getAkkaConfig respects executor config") {
-    var akkaConfig = AkkaUtils.getAkkaConfig(
+  test("getAkkaConfig sets executor with thread priority") {
+    val threadPriority = 3
+    val minThreads = 1
+    val maxThreads = 3
+    val akkaConfig = AkkaUtils.getAkkaConfig(
       new Configuration(),
       "localhost",
       1234,
-      AkkaUtils.getThreadPoolExecutorConfig(Thread.MIN_PRIORITY))
+      AkkaUtils.getThreadPoolExecutorConfig(
+        new FixedThreadPoolExecutorConfiguration(minThreads, maxThreads, 
threadPriority)
+      ))
 
     akkaConfig.getString("akka.actor.default-dispatcher.executor") should
       equal("thread-pool-executor")
 
     akkaConfig.getInt("akka.actor.default-dispatcher.thread-priority") should
-      equal(Thread.MIN_PRIORITY)
-
-    akkaConfig = AkkaUtils.getAkkaConfig(
-      new Configuration(),
-      "localhost",
-      1234,
-      AkkaUtils.getThreadPoolExecutorConfig(Thread.MAX_PRIORITY))
-
-    akkaConfig.getInt("akka.actor.default-dispatcher.thread-priority") should
-      equal(Thread.MAX_PRIORITY)
-  }
-
-  test("thread priority for metrics ActorSystem ") {
-    var actorSystem = MetricUtils.startMetricsActorSystem(
-      new Configuration, "localhost", LoggerFactory.getLogger("AkkaUtilsTest"))
-    //test default thread priority
-    val defaultThreadPriority = actorSystem.settings.config.getInt(
-      "akka.actor.default-dispatcher.thread-priority")
-    //check default value
-    assertEquals(Thread.MIN_PRIORITY, defaultThreadPriority)
-
-    val config = new Configuration()
-    config.setInteger(MetricOptions.QUERY_SERVICE_THREAD_PRIORITY, 
Thread.MAX_PRIORITY)
-    actorSystem = MetricUtils.startMetricsActorSystem(
-      config, "localhost", LoggerFactory.getLogger("AkkaUtilsTest"))
-    val threadPriority = actorSystem.settings.config.getInt(
-      "akka.actor.default-dispatcher.thread-priority")
-    //check config value
-    assertEquals(Thread.MAX_PRIORITY, threadPriority)
+      equal(threadPriority)
+    
akkaConfig.getInt("akka.actor.default-dispatcher.thread-pool-executor.core-pool-size-min")
+      .should(equal(minThreads))
+    
akkaConfig.getInt("akka.actor.default-dispatcher.thread-pool-executor.core-pool-size-max")
+      .should(equal(maxThreads))
   }
 }

Reply via email to