This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 61330e47afe8960c804b8cd2edf2ba589b594d8b
Author: Till Rohrmann <[email protected]>
AuthorDate: Tue Oct 16 13:52:53 2018 +0200

    Revert "[FLINK-10247] Run query service in separate ActorSystem"
    
    This reverts commit 9f593cd23d130b5a1266bd68ea5b34860c170c39.
---
 docs/_includes/generated/metric_configuration.html |   5 -
 .../apache/flink/configuration/MetricOptions.java  |  12 --
 .../apache/flink/types/SerializableOptional.java   |  86 -------------
 .../runtime/clusterframework/BootstrapTools.java   | 143 +--------------------
 .../runtime/entrypoint/ClusterEntrypoint.java      |  17 +--
 .../flink/runtime/metrics/util/MetricUtils.java    |  18 ---
 .../flink/runtime/minicluster/MiniCluster.java     |  53 +++-----
 .../runtime/resourcemanager/ResourceManager.java   |  22 +---
 .../runtime/rpc/akka/AkkaRpcServiceUtils.java      |   5 +-
 .../flink/runtime/taskexecutor/TaskExecutor.java   |  12 --
 .../runtime/taskexecutor/TaskExecutorGateway.java  |   8 --
 .../runtime/taskexecutor/TaskManagerRunner.java    |  24 +---
 .../org/apache/flink/runtime/akka/AkkaUtils.scala  | 123 ++++--------------
 .../runtime/taskexecutor/TaskExecutorITCase.java   |   1 -
 .../runtime/taskexecutor/TaskExecutorTest.java     |  17 +--
 .../taskexecutor/TestingTaskExecutorGateway.java   |   6 -
 .../apache/flink/runtime/akka/AkkaUtilsTest.scala  |  28 +---
 17 files changed, 66 insertions(+), 514 deletions(-)

diff --git a/docs/_includes/generated/metric_configuration.html 
b/docs/_includes/generated/metric_configuration.html
index 5f52abd..0fe9d0c 100644
--- a/docs/_includes/generated/metric_configuration.html
+++ b/docs/_includes/generated/metric_configuration.html
@@ -8,11 +8,6 @@
     </thead>
     <tbody>
         <tr>
-            <td><h5>metrics.internal.query-service.port</h5></td>
-            <td style="word-wrap: break-word;">"0"</td>
-            <td>The port range used for Flink's internal metric query service. 
Accepts a list of ports (“50100,50101”), ranges(“50100-50200”) or a combination 
of both. It is recommended to set a range of ports to avoid collisions when 
multiple Flink components are running on the same machine. Per default Flink 
will pick a random port.</td>
-        </tr>
-        <tr>
             <td><h5>metrics.latency.granularity</h5></td>
             <td style="word-wrap: break-word;">"subtask"</td>
             <td>Defines the granularity of latency metrics. Accepted values 
are:<ul><li>single - Track latency without differentiating between sources and 
subtasks.</li><li>operator - Track latency while differentiating between 
sources, but not subtasks.</li><li>subtask - Track latency while 
differentiating between sources and subtasks.</li></ul></td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
index e76b7f2..fc6b3c1 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
@@ -130,18 +130,6 @@ public class MetricOptions {
                        .defaultValue(128)
                        .withDescription("Defines the number of measured 
latencies to maintain at each operator.");
 
-       /**
-        * The default network port range for Flink's internal metric query 
service. The {@code "0"} means that
-        * Flink searches for a free port.
-        */
-       public static final ConfigOption<String> QUERY_SERVICE_PORT =
-               key("metrics.internal.query-service.port")
-               .defaultValue("0")
-               .withDescription("The port range used for Flink's internal 
metric query service. Accepts a list of ports " +
-                       "(“50100,50101”), ranges(“50100-50200”) or a 
combination of both. It is recommended to set a range of " +
-                       "ports to avoid collisions when multiple Flink 
components are running on the same machine. Per default " +
-                       "Flink will pick a random port.");
-
        private MetricOptions() {
        }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/SerializableOptional.java 
b/flink-core/src/main/java/org/apache/flink/types/SerializableOptional.java
deleted file mode 100644
index 89dcea4..0000000
--- a/flink-core/src/main/java/org/apache/flink/types/SerializableOptional.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.types;
-
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-import java.io.Serializable;
-import java.util.NoSuchElementException;
-import java.util.Optional;
-import java.util.function.Consumer;
-import java.util.function.Function;
-
-/**
- * Serializable {@link Optional}.
- */
-public final class SerializableOptional<T extends Serializable> implements 
Serializable {
-       private static final long serialVersionUID = -3312769593551775940L;
-
-       private static final SerializableOptional<?> EMPTY = new 
SerializableOptional<>(null);
-
-       @Nullable
-       private final T value;
-
-       private SerializableOptional(@Nullable T value) {
-               this.value = value;
-       }
-
-       public T get() {
-               if (value == null) {
-                       throw new NoSuchElementException("No value present");
-               }
-               return value;
-       }
-
-       public boolean isPresent() {
-               return value != null;
-       }
-
-       public void ifPresent(Consumer<? super T> consumer) {
-               if (value != null) {
-                       consumer.accept(value);
-               }
-       }
-
-       public <R> Optional<R> map(Function<? super T, ? extends R> mapper) {
-               if (value == null) {
-                       return Optional.empty();
-               } else {
-                       return Optional.ofNullable(mapper.apply(value));
-               }
-       }
-
-       public static <T extends Serializable> SerializableOptional<T> 
of(@Nonnull T value) {
-               return new SerializableOptional<>(value);
-       }
-
-       public static <T extends Serializable> SerializableOptional<T> 
ofNullable(@Nullable T value) {
-               if (value == null) {
-                       return empty();
-               } else {
-                       return of(value);
-               }
-       }
-
-       @SuppressWarnings("unchecked")
-       public static <T extends Serializable> SerializableOptional<T> empty() {
-               return (SerializableOptional<T>) EMPTY;
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 00b6173..56e4576 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -46,7 +46,6 @@ import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.File;
@@ -86,66 +85,13 @@ public class BootstrapTools {
         * @param portRangeDefinition The port range to choose a port from.
         * @param logger The logger to output log information.
         * @return The ActorSystem which has been started
-        * @throws Exception Thrown when actor system cannot be started in 
specified port range
-        */
-       public static ActorSystem startActorSystem(
-               Configuration configuration,
-               String listeningAddress,
-               String portRangeDefinition,
-               Logger logger) throws Exception {
-               return startActorSystem(
-                       configuration,
-                       listeningAddress,
-                       portRangeDefinition,
-                       logger,
-                       ActorSystemExecutorMode.FORK_JOIN_EXECUTOR);
-       }
-
-       /**
-        * Starts an ActorSystem with the given configuration listening at the 
address/ports.
-        *
-        * @param configuration The Flink configuration
-        * @param listeningAddress The address to listen at.
-        * @param portRangeDefinition The port range to choose a port from.
-        * @param logger The logger to output log information.
-        * @param executorMode The executor mode of Akka actor system.
-        * @return The ActorSystem which has been started
-        * @throws Exception Thrown when actor system cannot be started in 
specified port range
-        */
-       public static ActorSystem startActorSystem(
-                       Configuration configuration,
-                       String listeningAddress,
-                       String portRangeDefinition,
-                       Logger logger,
-                       @Nonnull ActorSystemExecutorMode executorMode) throws 
Exception {
-               return startActorSystem(
-                       configuration,
-                       AkkaUtils.getFlinkActorSystemName(),
-                       listeningAddress,
-                       portRangeDefinition,
-                       logger,
-                       executorMode);
-       }
-
-       /**
-        * Starts an ActorSystem with the given configuration listening at the 
address/ports.
-        *
-        * @param configuration The Flink configuration
-        * @param actorSystemName Name of the started {@link ActorSystem}
-        * @param listeningAddress The address to listen at.
-        * @param portRangeDefinition The port range to choose a port from.
-        * @param logger The logger to output log information.
-        * @param executorMode The executor mode of Akka actor system.
-        * @return The ActorSystem which has been started
-        * @throws Exception Thrown when actor system cannot be started in 
specified port range
+        * @throws Exception
         */
        public static ActorSystem startActorSystem(
                        Configuration configuration,
-                       String actorSystemName,
                        String listeningAddress,
                        String portRangeDefinition,
-                       Logger logger,
-                       @Nonnull ActorSystemExecutorMode executorMode) throws 
Exception {
+                       Logger logger) throws Exception {
 
                // parse port range definition and create port iterator
                Iterator<Integer> portsIterator;
@@ -171,13 +117,7 @@ public class BootstrapTools {
                        }
 
                        try {
-                               return startActorSystem(
-                                       configuration,
-                                       actorSystemName,
-                                       listeningAddress,
-                                       port,
-                                       logger,
-                                       executorMode);
+                               return startActorSystem(configuration, 
listeningAddress, port, logger);
                        }
                        catch (Exception e) {
                                // we can continue to try if this contains a 
netty channel exception
@@ -196,7 +136,6 @@ public class BootstrapTools {
 
        /**
         * Starts an Actor System at a specific port.
-        *
         * @param configuration The Flink configuration.
         * @param listeningAddress The address to listen at.
         * @param listeningPort The port to listen at.
@@ -205,56 +144,10 @@ public class BootstrapTools {
         * @throws Exception
         */
        public static ActorSystem startActorSystem(
-               Configuration configuration,
-               String listeningAddress,
-               int listeningPort,
-               Logger logger) throws Exception {
-               return startActorSystem(configuration, listeningAddress, 
listeningPort, logger, ActorSystemExecutorMode.FORK_JOIN_EXECUTOR);
-       }
-
-       /**
-        * Starts an Actor System at a specific port.
-        * @param configuration The Flink configuration.
-        * @param listeningAddress The address to listen at.
-        * @param listeningPort The port to listen at.
-        * @param logger the logger to output log information.
-        * @param executorMode The executor mode of Akka actor system.
-        * @return The ActorSystem which has been started.
-        * @throws Exception
-        */
-       public static ActorSystem startActorSystem(
                                Configuration configuration,
                                String listeningAddress,
                                int listeningPort,
-                               Logger logger,
-                               ActorSystemExecutorMode executorMode) throws 
Exception {
-               return startActorSystem(
-                       configuration,
-                       AkkaUtils.getFlinkActorSystemName(),
-                       listeningAddress,
-                       listeningPort,
-                       logger,
-                       executorMode);
-       }
-
-       /**
-        * Starts an Actor System at a specific port.
-        * @param configuration The Flink configuration.
-        * @param actorSystemName Name of the started {@link ActorSystem}
-        * @param listeningAddress The address to listen at.
-        * @param listeningPort The port to listen at.
-        * @param logger the logger to output log information.
-        * @param executorMode The executor mode of Akka actor system.
-        * @return The ActorSystem which has been started.
-        * @throws Exception
-        */
-       public static ActorSystem startActorSystem(
-               Configuration configuration,
-               String actorSystemName,
-               String listeningAddress,
-               int listeningPort,
-               Logger logger,
-               ActorSystemExecutorMode executorMode) throws Exception {
+                               Logger logger) throws Exception {
 
                String hostPortUrl = 
NetUtils.unresolvedHostAndPortToNormalizedString(listeningAddress, 
listeningPort);
                logger.info("Trying to start actor system at {}", hostPortUrl);
@@ -262,13 +155,12 @@ public class BootstrapTools {
                try {
                        Config akkaConfig = AkkaUtils.getAkkaConfig(
                                configuration,
-                               new Some<>(new Tuple2<>(listeningAddress, 
listeningPort)),
-                               getExecutorConfigByExecutorMode(configuration, 
executorMode)
+                               new Some<>(new Tuple2<>(listeningAddress, 
listeningPort))
                        );
 
                        logger.debug("Using akka configuration\n {}", 
akkaConfig);
 
-                       ActorSystem actorSystem = 
AkkaUtils.createActorSystem(actorSystemName, akkaConfig);
+                       ActorSystem actorSystem = 
AkkaUtils.createActorSystem(akkaConfig);
 
                        logger.info("Actor system started at {}", 
AkkaUtils.getAddress(actorSystem));
                        return actorSystem;
@@ -278,24 +170,13 @@ public class BootstrapTools {
                                Throwable cause = t.getCause();
                                if (cause != null && t.getCause() instanceof 
BindException) {
                                        throw new IOException("Unable to create 
ActorSystem at address " + hostPortUrl +
-                                               " : " + cause.getMessage(), t);
+                                                       " : " + 
cause.getMessage(), t);
                                }
                        }
                        throw new Exception("Could not create actor system", t);
                }
        }
 
-       private static Config getExecutorConfigByExecutorMode(Configuration 
configuration, ActorSystemExecutorMode executorMode) {
-               switch (executorMode) {
-                       case FORK_JOIN_EXECUTOR:
-                               return 
AkkaUtils.getForkJoinExecutorConfig(configuration);
-                       case FIXED_THREAD_POOL_EXECUTOR:
-                               return AkkaUtils.getThreadPoolExecutorConfig();
-                       default:
-                               throw new 
IllegalArgumentException(String.format("Unknown ActorSystemExecutorMode %s.", 
executorMode));
-               }
-       }
-
        /**
         * Starts the web frontend.
         *
@@ -623,14 +504,4 @@ public class BootstrapTools {
 
                return clonedConfiguration;
        }
-
-       /**
-        * Options to specify which executor to use in an {@link ActorSystem}.
-        */
-       public enum ActorSystemExecutorMode {
-               /** Used by default, use dispatcher with fork-join-executor. **/
-               FORK_JOIN_EXECUTOR,
-               /** Use dispatcher with fixed thread pool executor. **/
-               FIXED_THREAD_POOL_EXECUTOR
-       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 2a9baa9..fd0a0a1 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -96,8 +96,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import scala.concurrent.duration.FiniteDuration;
 
-import static 
org.apache.flink.runtime.clusterframework.BootstrapTools.ActorSystemExecutorMode.FORK_JOIN_EXECUTOR;
-
 /**
  * Base class for the Flink cluster entry points.
  *
@@ -157,9 +155,6 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
        private WebMonitorEndpoint<?> webMonitorEndpoint;
 
        @GuardedBy("lock")
-       private ActorSystem metricQueryServiceActorSystem;
-
-       @GuardedBy("lock")
        private ArchivedExecutionGraphStore archivedExecutionGraphStore;
 
        @GuardedBy("lock")
@@ -281,9 +276,9 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
                        metricRegistry = createMetricRegistry(configuration);
 
                        // TODO: This is a temporary hack until we have ported 
the MetricQueryService to the new RpcEndpoint
-                       // Start actor system for metric query service on any 
available port
-                       metricQueryServiceActorSystem = 
MetricUtils.startMetricsActorSystem(configuration, bindAddress, LOG);
-                       
metricRegistry.startQueryService(metricQueryServiceActorSystem, null);
+                       // start the MetricQueryService
+                       final ActorSystem actorSystem = ((AkkaRpcService) 
commonRpcService).getActorSystem();
+                       metricRegistry.startQueryService(actorSystem, null);
 
                        archivedExecutionGraphStore = 
createSerializableExecutionGraphStore(configuration, 
commonRpcService.getScheduledExecutor());
 
@@ -401,7 +396,7 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
                        Configuration configuration,
                        String bindAddress,
                        String portRange) throws Exception {
-               ActorSystem actorSystem = 
BootstrapTools.startActorSystem(configuration, bindAddress, portRange, LOG, 
FORK_JOIN_EXECUTOR);
+               ActorSystem actorSystem = 
BootstrapTools.startActorSystem(configuration, bindAddress, portRange, LOG);
                FiniteDuration duration = AkkaUtils.getTimeout(configuration);
                return new AkkaRpcService(actorSystem, 
Time.of(duration.length(), duration.unit()));
        }
@@ -469,10 +464,6 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
                                
terminationFutures.add(metricRegistry.shutdown());
                        }
 
-                       if (metricQueryServiceActorSystem != null) {
-                               
terminationFutures.add(AkkaUtils.terminateActorSystem(metricQueryServiceActorSystem));
-                       }
-
                        if (commonRpcService != null) {
                                
terminationFutures.add(commonRpcService.stopService());
                        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
index 39e5f44..3fd268a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
@@ -18,11 +18,8 @@
 
 package org.apache.flink.runtime.metrics.util;
 
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
@@ -30,7 +27,6 @@ import 
org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.Preconditions;
 
-import akka.actor.ActorSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,15 +45,12 @@ import java.lang.management.MemoryMXBean;
 import java.lang.management.ThreadMXBean;
 import java.util.List;
 
-import static 
org.apache.flink.runtime.clusterframework.BootstrapTools.ActorSystemExecutorMode.FIXED_THREAD_POOL_EXECUTOR;
-
 /**
  * Utility class to register pre-defined metric sets.
  */
 public class MetricUtils {
        private static final Logger LOG = 
LoggerFactory.getLogger(MetricUtils.class);
        private static final String METRIC_GROUP_STATUS_NAME = "Status";
-       private static final String METRICS_ACTOR_SYSTEM_NAME = "flink-metrics";
 
        private MetricUtils() {
        }
@@ -109,17 +102,6 @@ public class MetricUtils {
                instantiateCPUMetrics(jvm.addGroup("CPU"));
        }
 
-       public static ActorSystem startMetricsActorSystem(Configuration 
configuration, String hostname, Logger logger) throws Exception {
-               final String portRange = 
configuration.getString(MetricOptions.QUERY_SERVICE_PORT);
-               return BootstrapTools.startActorSystem(
-                       configuration,
-                       METRICS_ACTOR_SYSTEM_NAME,
-                       hostname,
-                       portRange,
-                       logger,
-                       FIXED_THREAD_POOL_EXECUTOR);
-       }
-
        private static void instantiateNetworkMetrics(
                MetricGroup metrics,
                final NetworkEnvironment network) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index bcd4c7b..4bfdb25 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -134,9 +134,6 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
        private RpcService resourceManagerRpcService;
 
        @GuardedBy("lock")
-       private ActorSystem metricQueryServiceActorSystem;
-
-       @GuardedBy("lock")
        private HighAvailabilityServices haServices;
 
        @GuardedBy("lock")
@@ -255,11 +252,8 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
                                commonRpcService = 
createRpcService(configuration, rpcTimeout, false, null);
 
                                // TODO: Temporary hack until the metric query 
service is ported to the RpcEndpoint
-                               metricQueryServiceActorSystem = 
MetricUtils.startMetricsActorSystem(
-                                       configuration,
-                                       commonRpcService.getAddress(),
-                                       LOG);
-                               
metricRegistry.startQueryService(metricQueryServiceActorSystem, null);
+                               final ActorSystem actorSystem = 
((AkkaRpcService) commonRpcService).getActorSystem();
+                               metricRegistry.startQueryService(actorSystem, 
null);
 
                                if (useSingleRpcService) {
                                        for (int i = 0; i < numTaskManagers; 
i++) {
@@ -356,7 +350,7 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
                                                
configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1),
                                                "DispatcherRestEndpoint"),
                                        new AkkaQueryServiceRetriever(
-                                               metricQueryServiceActorSystem,
+                                               actorSystem,
                                                
Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))),
                                        
haServices.getWebMonitorLeaderElectionService(),
                                        new ShutDownFatalErrorHandler());
@@ -453,12 +447,24 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
 
                                        final FutureUtils.ConjunctFuture<Void> 
componentsTerminationFuture = 
FutureUtils.completeAll(componentTerminationFutures);
 
-                                       final CompletableFuture<Void> 
metricSystemTerminationFuture = FutureUtils.composeAfterwards(
+                                       final CompletableFuture<Void> 
metricRegistryTerminationFuture = FutureUtils.runAfterwards(
                                                componentsTerminationFuture,
-                                               this::closeMetricSystem);
+                                               () -> {
+                                                       synchronized (lock) {
+                                                               if 
(jobManagerMetricGroup != null) {
+                                                                       
jobManagerMetricGroup.close();
+                                                                       
jobManagerMetricGroup = null;
+                                                               }
+                                                               // metrics 
shutdown
+                                                               if 
(metricRegistry != null) {
+                                                                       
metricRegistry.shutdown();
+                                                                       
metricRegistry = null;
+                                                               }
+                                                       }
+                                               });
 
                                        // shut down the RpcServices
-                                       final CompletableFuture<Void> 
rpcServicesTerminationFuture = metricSystemTerminationFuture
+                                       final CompletableFuture<Void> 
rpcServicesTerminationFuture = metricRegistryTerminationFuture
                                                .thenCompose((Void ignored) -> 
terminateRpcServices());
 
                                        final CompletableFuture<Void> 
remainingServicesTerminationFuture = FutureUtils.runAfterwards(
@@ -482,29 +488,6 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
                }
        }
 
-       private CompletableFuture<Void> closeMetricSystem() {
-               synchronized (lock) {
-                       if (jobManagerMetricGroup != null) {
-                               jobManagerMetricGroup.close();
-                               jobManagerMetricGroup = null;
-                       }
-
-                       final ArrayList<CompletableFuture<Void>> 
terminationFutures = new ArrayList<>(2);
-
-                       // metrics shutdown
-                       if (metricRegistry != null) {
-                               
terminationFutures.add(metricRegistry.shutdown());
-                               metricRegistry = null;
-                       }
-
-                       if (metricQueryServiceActorSystem != null) {
-                               
terminationFutures.add(AkkaUtils.terminateActorSystem(metricQueryServiceActorSystem));
-                       }
-
-                       return FutureUtils.completeAll(terminationFutures);
-               }
-       }
-
        // 
------------------------------------------------------------------------
        //  Accessing jobs
        // 
------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index a3a075d..d78e346 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -48,6 +48,7 @@ import 
org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.MetricQueryService;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
@@ -75,13 +76,11 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -594,26 +593,19 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
 
        @Override
        public CompletableFuture<Collection<Tuple2<ResourceID, String>>> 
requestTaskManagerMetricQueryServicePaths(Time timeout) {
-               final ArrayList<CompletableFuture<Optional<Tuple2<ResourceID, 
String>>>> metricQueryServicePathFutures = new 
ArrayList<>(taskExecutors.size());
+               final ArrayList<Tuple2<ResourceID, String>> 
metricQueryServicePaths = new ArrayList<>(taskExecutors.size());
 
                for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> 
workerRegistrationEntry : taskExecutors.entrySet()) {
                        final ResourceID tmResourceId = 
workerRegistrationEntry.getKey();
                        final WorkerRegistration<WorkerType> workerRegistration 
= workerRegistrationEntry.getValue();
-                       final TaskExecutorGateway taskExecutorGateway = 
workerRegistration.getTaskExecutorGateway();
+                       final String taskManagerAddress = 
workerRegistration.getTaskExecutorGateway().getAddress();
+                       final String tmMetricQueryServicePath = 
taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) +
+                               MetricQueryService.METRIC_QUERY_SERVICE_NAME + 
'_' + tmResourceId.getResourceIdString();
 
-                       final CompletableFuture<Optional<Tuple2<ResourceID, 
String>>> metricQueryServicePathFuture = taskExecutorGateway
-                               .requestMetricQueryServiceAddress(timeout)
-                               .thenApply(optional -> optional.map(path -> 
Tuple2.of(tmResourceId, path)));
-
-                       
metricQueryServicePathFutures.add(metricQueryServicePathFuture);
+                       metricQueryServicePaths.add(Tuple2.of(tmResourceId, 
tmMetricQueryServicePath));
                }
 
-               return 
FutureUtils.combineAll(metricQueryServicePathFutures).thenApply(
-                       collection -> collection
-                               .stream()
-                               .filter(Optional::isPresent)
-                               .map(Optional::get)
-                               .collect(Collectors.toList()));
+               return 
CompletableFuture.completedFuture(metricQueryServicePaths);
        }
 
        @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
index 3ee7641..3a62698 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
@@ -70,10 +70,7 @@ public class AkkaRpcServiceUtils {
         * @throws IOException      Thrown, if the actor system can not bind to 
the address
         * @throws Exception      Thrown is some other error occurs while 
creating akka actor system
         */
-       public static RpcService createRpcService(
-               String hostname,
-               int port,
-               Configuration configuration) throws Exception {
+       public static RpcService createRpcService(String hostname, int port, 
Configuration configuration) throws Exception {
                LOG.info("Starting AkkaRpcService at {}.", 
NetUtils.unresolvedHostAndPortToNormalizedString(hostname, port));
 
                final ActorSystem actorSystem;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index f90e939..33db7e1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -102,7 +102,6 @@ import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.types.SerializableOptional;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 
@@ -158,10 +157,6 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
 
        private final BlobCacheService blobCacheService;
 
-       /** The path to metric query service on this Task Manager. */
-       @Nullable
-       private final String metricQueryServicePath;
-
        // --------- TaskManager services --------
 
        /** The connection information of this task manager. */
@@ -216,7 +211,6 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                        TaskManagerServices taskExecutorServices,
                        HeartbeatServices heartbeatServices,
                        TaskManagerMetricGroup taskManagerMetricGroup,
-                       @Nullable String metricQueryServicePath,
                        BlobCacheService blobCacheService,
                        FatalErrorHandler fatalErrorHandler) {
 
@@ -230,7 +224,6 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
                this.taskManagerMetricGroup = 
checkNotNull(taskManagerMetricGroup);
                this.blobCacheService = checkNotNull(blobCacheService);
-               this.metricQueryServicePath = metricQueryServicePath;
 
                this.taskSlotTable = taskExecutorServices.getTaskSlotTable();
                this.jobManagerTable = 
taskExecutorServices.getJobManagerTable();
@@ -854,11 +847,6 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                }
        }
 
-       @Override
-       public CompletableFuture<SerializableOptional<String>> 
requestMetricQueryServiceAddress(Time timeout) {
-               return 
CompletableFuture.completedFuture(SerializableOptional.ofNullable(metricQueryServicePath));
-       }
-
        // 
----------------------------------------------------------------------
        // Disconnection RPCs
        // 
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index d6b9e15..4f79289 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -36,7 +36,6 @@ import 
org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.types.SerializableOptional;
 
 import java.util.concurrent.CompletableFuture;
 
@@ -196,11 +195,4 @@ public interface TaskExecutorGateway extends RpcGateway {
         * @return Future which is completed with the {@link TransientBlobKey} 
of the uploaded file.
         */
        CompletableFuture<TransientBlobKey> requestFileUpload(FileType 
fileType, @RpcTimeout Time timeout);
-
-       /**
-        * Returns the fully qualified address of Metric Query Service on the 
TaskManager.
-        *
-        * @return Future String with Fully qualified (RPC) address of Metric 
Query Service on the TaskManager.
-        */
-       CompletableFuture<SerializableOptional<String>> 
requestMetricQueryServiceAddress(@RpcTimeout Time timeout);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index f830ae1..42fe5bf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -43,6 +43,7 @@ import 
org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.util.MetricUtils;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
@@ -100,8 +101,6 @@ public class TaskManagerRunner implements 
FatalErrorHandler, AutoCloseableAsync
 
        private final RpcService rpcService;
 
-       private final ActorSystem metricQueryServiceActorSystem;
-
        private final HighAvailabilityServices highAvailabilityServices;
 
        private final MetricRegistryImpl metricRegistry;
@@ -133,14 +132,14 @@ public class TaskManagerRunner implements 
FatalErrorHandler, AutoCloseableAsync
                        
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
 
                rpcService = createRpcService(configuration, 
highAvailabilityServices);
-               metricQueryServiceActorSystem = 
MetricUtils.startMetricsActorSystem(configuration, rpcService.getAddress(), 
LOG);
 
                HeartbeatServices heartbeatServices = 
HeartbeatServices.fromConfiguration(configuration);
 
                metricRegistry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration));
 
                // TODO: Temporary hack until the MetricQueryService has been 
ported to RpcEndpoint
-               metricRegistry.startQueryService(metricQueryServiceActorSystem, 
resourceId);
+               final ActorSystem actorSystem = ((AkkaRpcService) 
rpcService).getActorSystem();
+               metricRegistry.startQueryService(actorSystem, resourceId);
 
                blobCacheService = new BlobCacheService(
                        configuration, 
highAvailabilityServices.createBlobStore(), null
@@ -160,7 +159,7 @@ public class TaskManagerRunner implements 
FatalErrorHandler, AutoCloseableAsync
                this.terminationFuture = new CompletableFuture<>();
                this.shutdown = false;
 
-               MemoryLogger.startIfConfigured(LOG, configuration, 
metricQueryServiceActorSystem);
+               MemoryLogger.startIfConfigured(LOG, configuration, actorSystem);
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -215,10 +214,6 @@ public class TaskManagerRunner implements 
FatalErrorHandler, AutoCloseableAsync
                                exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
                        }
 
-                       if (metricQueryServiceActorSystem != null) {
-                               
terminationFutures.add(AkkaUtils.terminateActorSystem(metricQueryServiceActorSystem));
-                       }
-
                        try {
                                highAvailabilityServices.close();
                        } catch (Exception e) {
@@ -378,8 +373,6 @@ public class TaskManagerRunner implements 
FatalErrorHandler, AutoCloseableAsync
 
                TaskManagerConfiguration taskManagerConfiguration = 
TaskManagerConfiguration.fromConfiguration(configuration);
 
-               String metricQueryServicePath = 
metricRegistry.getMetricQueryServicePath();
-
                return new TaskExecutor(
                        rpcService,
                        taskManagerConfiguration,
@@ -387,7 +380,6 @@ public class TaskManagerRunner implements 
FatalErrorHandler, AutoCloseableAsync
                        taskManagerServices,
                        heartbeatServices,
                        taskManagerMetricGroup,
-                       metricQueryServicePath,
                        blobCacheService,
                        fatalErrorHandler);
        }
@@ -424,14 +416,6 @@ public class TaskManagerRunner implements 
FatalErrorHandler, AutoCloseableAsync
 
                final String portRangeDefinition = 
configuration.getString(TaskManagerOptions.RPC_PORT);
 
-               return bindWithPort(configuration, taskManagerHostname, 
portRangeDefinition);
-       }
-
-       private static RpcService bindWithPort(
-               Configuration configuration,
-               String taskManagerHostname,
-               String portRangeDefinition) throws Exception{
-
                // parse port range definition and create port iterator
                Iterator<Integer> portsIterator;
                try {
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 2b0c939..dcf0fdd 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -50,12 +50,6 @@ object AkkaUtils {
 
   val INF_TIMEOUT: FiniteDuration = 21474835 seconds
 
-  val FLINK_ACTOR_SYSTEM_NAME = "flink"
-
-  def getFlinkActorSystemName = {
-    FLINK_ACTOR_SYSTEM_NAME
-  }
-
   /**
    * Creates a local actor system without remoting.
    *
@@ -109,19 +103,9 @@ object AkkaUtils {
    * @return created actor system
    */
   def createActorSystem(akkaConfig: Config): ActorSystem = {
-    createActorSystem(FLINK_ACTOR_SYSTEM_NAME, akkaConfig)
-  }
-
-  /**
-    * Creates an actor system with the given akka config.
-    *
-    * @param akkaConfig configuration for the actor system
-    * @return created actor system
-    */
-  def createActorSystem(actorSystemName: String, akkaConfig: Config): 
ActorSystem = {
     // Initialize slf4j as logger of Akka's Netty instead of java.util.logging 
(FLINK-1650)
     InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory)
-    ActorSystem.create(actorSystemName, akkaConfig)
+    ActorSystem.create("flink", akkaConfig)
   }
 
   /**
@@ -135,23 +119,7 @@ object AkkaUtils {
   }
 
   /**
-    * Returns a remote Akka config for the given configuration values.
-    *
-    * @param configuration containing the user provided configuration values
-    * @param hostname to bind against. If null, then the loopback interface is 
used
-    * @param port to bind against
-    * @param executorMode containing the user specified mode of executor
-    * @return A remote Akka config
-    */
-  def getAkkaConfig(configuration: Configuration,
-                    hostname: String,
-                    port: Int,
-                    executorConfig: Config): Config = {
-    getAkkaConfig(configuration, Some((hostname, port)), executorConfig)
-  }
-
-  /**
-    * Returns a remote Akka config for the given configuration values.
+    * Return a remote Akka config for the given configuration values.
     *
     * @param configuration containing the user provided configuration values
     * @param hostname to bind against. If null, then the loopback interface is 
used
@@ -187,25 +155,7 @@ object AkkaUtils {
   @throws(classOf[UnknownHostException])
   def getAkkaConfig(configuration: Configuration,
                     externalAddress: Option[(String, Int)]): Config = {
-    getAkkaConfig(configuration, externalAddress, 
getForkJoinExecutorConfig(configuration))
-  }
-
-  /**
-    * Creates an akka config with the provided configuration values. If the 
listening address is
-    * specified, then the actor system will listen on the respective address.
-    *
-    * @param configuration instance containing the user provided configuration 
values
-    * @param externalAddress optional tuple of bindAddress and port to be 
reachable at.
-    *                        If None is given, then an Akka config for local 
actor system
-    *                        will be returned
-    * @param executorConfig config defining the used executor by the default 
dispatcher
-    * @return Akka config
-    */
-  @throws(classOf[UnknownHostException])
-  def getAkkaConfig(configuration: Configuration,
-                    externalAddress: Option[(String, Int)],
-                    executorConfig: Config): Config = {
-    val defaultConfig = 
getBasicAkkaConfig(configuration).withFallback(executorConfig)
+    val defaultConfig = getBasicAkkaConfig(configuration)
 
     externalAddress match {
 
@@ -257,6 +207,24 @@ object AkkaUtils {
     val supervisorStrategy = 
classOf[StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy]
       .getCanonicalName
 
+    val forkJoinExecutorParallelismFactor =
+      
configuration.getDouble(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR)
+
+    val forkJoinExecutorParallelismMin =
+      configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MIN)
+
+    val forkJoinExecutorParallelismMax =
+      configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MAX)
+
+    val forkJoinExecutorConfig =
+      s"""
+         | fork-join-executor {
+         |   parallelism-factor = $forkJoinExecutorParallelismFactor
+         |   parallelism-min = $forkJoinExecutorParallelismMin
+         |   parallelism-max = $forkJoinExecutorParallelismMax
+         | }
+       """.stripMargin
+
     val config =
       s"""
         |akka {
@@ -283,6 +251,8 @@ object AkkaUtils {
         |
         |   default-dispatcher {
         |     throughput = $akkaThroughput
+        |
+        |   $forkJoinExecutorConfig
         |   }
         | }
         |}
@@ -291,53 +261,6 @@ object AkkaUtils {
     ConfigFactory.parseString(config)
   }
 
-  def getThreadPoolExecutorConfig: Config = {
-    val configString = s"""
-       |akka {
-       |  actor {
-       |    default-dispatcher {
-       |      executor = "thread-pool-executor"
-       |      thread-pool-executor {
-       |        core-pool-size-min = 2
-       |        core-pool-size-factor = 2.0
-       |        core-pool-size-max = 4
-       |      }
-       |    }
-       |  }
-       |}
-        """.
-      stripMargin
-
-    ConfigFactory.parseString(configString)
-  }
-
-  def getForkJoinExecutorConfig(configuration: Configuration): Config = {
-    val forkJoinExecutorParallelismFactor =
-      
configuration.getDouble(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR)
-
-    val forkJoinExecutorParallelismMin =
-      configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MIN)
-
-    val forkJoinExecutorParallelismMax =
-      configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MAX)
-
-    val configString = s"""
-       |akka {
-       |  actor {
-       |    default-dispatcher {
-       |      executor = "fork-join-executor"
-       |      fork-join-executor {
-       |        parallelism-factor = $forkJoinExecutorParallelismFactor
-       |        parallelism-min = $forkJoinExecutorParallelismMin
-       |        parallelism-max = $forkJoinExecutorParallelismMax
-       |      }
-       |    }
-       |  }
-       |}""".stripMargin
-
-    ConfigFactory.parseString(configString)
-  }
-
   def testDispatcherConfig: Config = {
     val config =
       s"""
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 3ee1b92..6e0f9c5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -165,7 +165,6 @@ public class TaskExecutorITCase extends TestLogger {
                        taskManagerServices,
                        heartbeatServices,
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        new BlobCacheService(
                                configuration,
                                new VoidBlobStore(),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 179dea2..4af0529 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -279,7 +279,6 @@ public class TaskExecutorTest extends TestLogger {
                        taskManagerServices,
                        heartbeatServices,
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -370,7 +369,6 @@ public class TaskExecutorTest extends TestLogger {
                        taskManagerServices,
                        heartbeatServices,
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -486,7 +484,6 @@ public class TaskExecutorTest extends TestLogger {
                        taskManagerServices,
                        heartbeatServices,
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -565,7 +562,6 @@ public class TaskExecutorTest extends TestLogger {
                        taskManagerServices,
                        new HeartbeatServices(1000L, 1000L),
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -629,7 +625,6 @@ public class TaskExecutorTest extends TestLogger {
                        taskManagerServices,
                        new HeartbeatServices(1000L, 1000L),
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -752,7 +747,6 @@ public class TaskExecutorTest extends TestLogger {
                        taskManagerServices,
                        new HeartbeatServices(1000L, 1000L),
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -852,7 +846,6 @@ public class TaskExecutorTest extends TestLogger {
                        taskManagerServices,
                        new HeartbeatServices(1000L, 1000L),
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -884,7 +877,7 @@ public class TaskExecutorTest extends TestLogger {
                        // the job leader should get the allocation id offered
                        verify(jobMasterGateway, 
Mockito.timeout(timeout.toMilliseconds())).offerSlots(
                                        any(ResourceID.class),
-                                       
(Collection<SlotOffer>)Matchers.argThat(contains(slotOffer)),
+                                       (Collection<SlotOffer>) 
Matchers.argThat(contains(slotOffer)),
                                        any(Time.class));
                } finally {
                        RpcUtils.terminateRpcEndpoint(taskManager, timeout);
@@ -967,7 +960,6 @@ public class TaskExecutorTest extends TestLogger {
                        taskManagerServices,
                        new HeartbeatServices(1000L, 1000L),
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -1062,7 +1054,6 @@ public class TaskExecutorTest extends TestLogger {
                        taskManagerServices,
                        new HeartbeatServices(1000L, 1000L),
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -1169,7 +1160,6 @@ public class TaskExecutorTest extends TestLogger {
                        taskManagerServices,
                        heartbeatServicesMock,
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -1243,7 +1233,6 @@ public class TaskExecutorTest extends TestLogger {
                        taskManagerServices,
                        heartbeatServices,
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -1300,7 +1289,6 @@ public class TaskExecutorTest extends TestLogger {
                        taskManagerServices,
                        new HeartbeatServices(1000L, 1000L),
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -1393,7 +1381,6 @@ public class TaskExecutorTest extends TestLogger {
                        taskManagerServices,
                        new HeartbeatServices(heartbeatInterval, 10L),
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -1501,7 +1488,6 @@ public class TaskExecutorTest extends TestLogger {
                                .build(),
                        new HeartbeatServices(heartbeatInterval, 1000L),
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -1713,7 +1699,6 @@ public class TaskExecutorTest extends TestLogger {
                        taskManagerServices,
                        new HeartbeatServices(1000L, 1000L),
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
index 5fd12a8..a9e9949 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
@@ -34,7 +34,6 @@ import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.StackTraceSampleResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
-import org.apache.flink.types.SerializableOptional;
 import org.apache.flink.util.Preconditions;
 
 import java.util.concurrent.CompletableFuture;
@@ -151,11 +150,6 @@ public class TestingTaskExecutorGateway implements 
TaskExecutorGateway {
        }
 
        @Override
-       public CompletableFuture<SerializableOptional<String>> 
requestMetricQueryServiceAddress(Time timeout) {
-               return 
CompletableFuture.completedFuture(SerializableOptional.of(address));
-       }
-
-       @Override
        public String getAddress() {
                return address;
        }
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
index e5c1668..d02a554 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.akka
 
-import java.net.{InetAddress, InetSocketAddress}
+import java.net.InetSocketAddress
 
 import org.apache.flink.configuration.{AkkaOptions, Configuration, 
IllegalConfigurationException}
 import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution
@@ -167,30 +167,4 @@ class AkkaUtilsTest
     akkaConfig.getString("akka.remote.netty.tcp.hostname") should
       equal(NetUtils.unresolvedHostToNormalizedString(hostname))
   }
-
-  test("null hostname should go to localhost") {
-    val configure = AkkaUtils.getAkkaConfig(new Configuration(), Some((null, 
1772)))
-
-    val hostname = configure.getString("akka.remote.netty.tcp.hostname")
-
-    InetAddress.getByName(hostname).isLoopbackAddress should be(true)
-  }
-
-  test("getAkkaConfig defaults to fork-join-executor") {
-    val akkaConfig = AkkaUtils.getAkkaConfig(new Configuration())
-
-    akkaConfig.getString("akka.actor.default-dispatcher.executor") should
-      equal("fork-join-executor")
-  }
-
-  test("getAkkaConfig respects executor config") {
-    val akkaConfig = AkkaUtils.getAkkaConfig(
-      new Configuration(),
-      "localhost",
-      1234,
-      AkkaUtils.getThreadPoolExecutorConfig)
-
-    akkaConfig.getString("akka.actor.default-dispatcher.executor") should
-      equal("thread-pool-executor")
-  }
 }

Reply via email to