This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 814d9984ed459f77c01e9ac5971cff17978a8e89
Author: Till Rohrmann <[email protected]>
AuthorDate: Tue Oct 16 13:58:15 2018 +0200

    Revert "[FLINK-10247] Run MetricQueryService in separate ActorSystem"
    
    This reverts commit 1368d1be56dc4c73506580eda3d38f0a965edd3c.
---
 docs/_includes/generated/metric_configuration.html |   5 -
 .../apache/flink/configuration/MetricOptions.java  |  12 --
 .../apache/flink/types/SerializableOptional.java   |  86 -------------
 .../runtime/clusterframework/BootstrapTools.java   | 143 +--------------------
 .../runtime/entrypoint/ClusterEntrypoint.java      |  17 +--
 .../flink/runtime/metrics/util/MetricUtils.java    |  18 ---
 .../flink/runtime/minicluster/MiniCluster.java     |  53 +++-----
 .../runtime/resourcemanager/ResourceManager.java   |  22 +---
 .../runtime/rpc/akka/AkkaRpcServiceUtils.java      |   5 +-
 .../flink/runtime/taskexecutor/TaskExecutor.java   |  12 --
 .../runtime/taskexecutor/TaskExecutorGateway.java  |   8 --
 .../runtime/taskexecutor/TaskManagerRunner.java    |  24 +---
 .../org/apache/flink/runtime/akka/AkkaUtils.scala  | 123 ++++--------------
 .../runtime/taskexecutor/TaskExecutorITCase.java   |   1 -
 .../runtime/taskexecutor/TaskExecutorTest.java     |  17 +--
 .../taskexecutor/TestingTaskExecutorGateway.java   |   6 -
 .../apache/flink/runtime/akka/AkkaUtilsTest.scala  |  28 +---
 17 files changed, 66 insertions(+), 514 deletions(-)

diff --git a/docs/_includes/generated/metric_configuration.html 
b/docs/_includes/generated/metric_configuration.html
index cba93e4..99992b2 100644
--- a/docs/_includes/generated/metric_configuration.html
+++ b/docs/_includes/generated/metric_configuration.html
@@ -8,11 +8,6 @@
     </thead>
     <tbody>
         <tr>
-            <td><h5>metrics.internal.query-service.port</h5></td>
-            <td style="word-wrap: break-word;">"0"</td>
-            <td>The port range used for Flink's internal metric query service. 
Accepts a list of ports (“50100,50101”), ranges(“50100-50200”) or a combination 
of both. It is recommended to set a range of ports to avoid collisions when 
multiple Flink components are running on the same machine. Per default Flink 
will pick a random port.</td>
-        </tr>
-        <tr>
             <td><h5>metrics.latency.granularity</h5></td>
             <td style="word-wrap: break-word;">"subtask"</td>
             <td>Defines the granularity of latency metrics. Accepted values 
are "single"(Track latency without differentiating between sources and 
subtasks), "operator"(Track latency while differentiating between sources, but 
not subtasks) and "subtask"(Track latency while differentiating between sources 
and subtasks).</td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
index bbf06eb..81744a2 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
@@ -125,18 +125,6 @@ public class MetricOptions {
                        .defaultValue(128)
                        .withDescription("Defines the number of measured 
latencies to maintain at each operator.");
 
-       /**
-        * The default network port range for Flink's internal metric query 
service. The {@code "0"} means that
-        * Flink searches for a free port.
-        */
-       public static final ConfigOption<String> QUERY_SERVICE_PORT =
-               key("metrics.internal.query-service.port")
-               .defaultValue("0")
-               .withDescription("The port range used for Flink's internal 
metric query service. Accepts a list of ports " +
-                       "(“50100,50101”), ranges(“50100-50200”) or a 
combination of both. It is recommended to set a range of " +
-                       "ports to avoid collisions when multiple Flink 
components are running on the same machine. Per default " +
-                       "Flink will pick a random port.");
-
        private MetricOptions() {
        }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/SerializableOptional.java 
b/flink-core/src/main/java/org/apache/flink/types/SerializableOptional.java
deleted file mode 100644
index 89dcea4..0000000
--- a/flink-core/src/main/java/org/apache/flink/types/SerializableOptional.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.types;
-
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-import java.io.Serializable;
-import java.util.NoSuchElementException;
-import java.util.Optional;
-import java.util.function.Consumer;
-import java.util.function.Function;
-
-/**
- * Serializable {@link Optional}.
- */
-public final class SerializableOptional<T extends Serializable> implements 
Serializable {
-       private static final long serialVersionUID = -3312769593551775940L;
-
-       private static final SerializableOptional<?> EMPTY = new 
SerializableOptional<>(null);
-
-       @Nullable
-       private final T value;
-
-       private SerializableOptional(@Nullable T value) {
-               this.value = value;
-       }
-
-       public T get() {
-               if (value == null) {
-                       throw new NoSuchElementException("No value present");
-               }
-               return value;
-       }
-
-       public boolean isPresent() {
-               return value != null;
-       }
-
-       public void ifPresent(Consumer<? super T> consumer) {
-               if (value != null) {
-                       consumer.accept(value);
-               }
-       }
-
-       public <R> Optional<R> map(Function<? super T, ? extends R> mapper) {
-               if (value == null) {
-                       return Optional.empty();
-               } else {
-                       return Optional.ofNullable(mapper.apply(value));
-               }
-       }
-
-       public static <T extends Serializable> SerializableOptional<T> 
of(@Nonnull T value) {
-               return new SerializableOptional<>(value);
-       }
-
-       public static <T extends Serializable> SerializableOptional<T> 
ofNullable(@Nullable T value) {
-               if (value == null) {
-                       return empty();
-               } else {
-                       return of(value);
-               }
-       }
-
-       @SuppressWarnings("unchecked")
-       public static <T extends Serializable> SerializableOptional<T> empty() {
-               return (SerializableOptional<T>) EMPTY;
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 00b6173..56e4576 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -46,7 +46,6 @@ import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.File;
@@ -86,66 +85,13 @@ public class BootstrapTools {
         * @param portRangeDefinition The port range to choose a port from.
         * @param logger The logger to output log information.
         * @return The ActorSystem which has been started
-        * @throws Exception Thrown when actor system cannot be started in 
specified port range
-        */
-       public static ActorSystem startActorSystem(
-               Configuration configuration,
-               String listeningAddress,
-               String portRangeDefinition,
-               Logger logger) throws Exception {
-               return startActorSystem(
-                       configuration,
-                       listeningAddress,
-                       portRangeDefinition,
-                       logger,
-                       ActorSystemExecutorMode.FORK_JOIN_EXECUTOR);
-       }
-
-       /**
-        * Starts an ActorSystem with the given configuration listening at the 
address/ports.
-        *
-        * @param configuration The Flink configuration
-        * @param listeningAddress The address to listen at.
-        * @param portRangeDefinition The port range to choose a port from.
-        * @param logger The logger to output log information.
-        * @param executorMode The executor mode of Akka actor system.
-        * @return The ActorSystem which has been started
-        * @throws Exception Thrown when actor system cannot be started in 
specified port range
-        */
-       public static ActorSystem startActorSystem(
-                       Configuration configuration,
-                       String listeningAddress,
-                       String portRangeDefinition,
-                       Logger logger,
-                       @Nonnull ActorSystemExecutorMode executorMode) throws 
Exception {
-               return startActorSystem(
-                       configuration,
-                       AkkaUtils.getFlinkActorSystemName(),
-                       listeningAddress,
-                       portRangeDefinition,
-                       logger,
-                       executorMode);
-       }
-
-       /**
-        * Starts an ActorSystem with the given configuration listening at the 
address/ports.
-        *
-        * @param configuration The Flink configuration
-        * @param actorSystemName Name of the started {@link ActorSystem}
-        * @param listeningAddress The address to listen at.
-        * @param portRangeDefinition The port range to choose a port from.
-        * @param logger The logger to output log information.
-        * @param executorMode The executor mode of Akka actor system.
-        * @return The ActorSystem which has been started
-        * @throws Exception Thrown when actor system cannot be started in 
specified port range
+        * @throws Exception
         */
        public static ActorSystem startActorSystem(
                        Configuration configuration,
-                       String actorSystemName,
                        String listeningAddress,
                        String portRangeDefinition,
-                       Logger logger,
-                       @Nonnull ActorSystemExecutorMode executorMode) throws 
Exception {
+                       Logger logger) throws Exception {
 
                // parse port range definition and create port iterator
                Iterator<Integer> portsIterator;
@@ -171,13 +117,7 @@ public class BootstrapTools {
                        }
 
                        try {
-                               return startActorSystem(
-                                       configuration,
-                                       actorSystemName,
-                                       listeningAddress,
-                                       port,
-                                       logger,
-                                       executorMode);
+                               return startActorSystem(configuration, 
listeningAddress, port, logger);
                        }
                        catch (Exception e) {
                                // we can continue to try if this contains a 
netty channel exception
@@ -196,7 +136,6 @@ public class BootstrapTools {
 
        /**
         * Starts an Actor System at a specific port.
-        *
         * @param configuration The Flink configuration.
         * @param listeningAddress The address to listen at.
         * @param listeningPort The port to listen at.
@@ -205,56 +144,10 @@ public class BootstrapTools {
         * @throws Exception
         */
        public static ActorSystem startActorSystem(
-               Configuration configuration,
-               String listeningAddress,
-               int listeningPort,
-               Logger logger) throws Exception {
-               return startActorSystem(configuration, listeningAddress, 
listeningPort, logger, ActorSystemExecutorMode.FORK_JOIN_EXECUTOR);
-       }
-
-       /**
-        * Starts an Actor System at a specific port.
-        * @param configuration The Flink configuration.
-        * @param listeningAddress The address to listen at.
-        * @param listeningPort The port to listen at.
-        * @param logger the logger to output log information.
-        * @param executorMode The executor mode of Akka actor system.
-        * @return The ActorSystem which has been started.
-        * @throws Exception
-        */
-       public static ActorSystem startActorSystem(
                                Configuration configuration,
                                String listeningAddress,
                                int listeningPort,
-                               Logger logger,
-                               ActorSystemExecutorMode executorMode) throws 
Exception {
-               return startActorSystem(
-                       configuration,
-                       AkkaUtils.getFlinkActorSystemName(),
-                       listeningAddress,
-                       listeningPort,
-                       logger,
-                       executorMode);
-       }
-
-       /**
-        * Starts an Actor System at a specific port.
-        * @param configuration The Flink configuration.
-        * @param actorSystemName Name of the started {@link ActorSystem}
-        * @param listeningAddress The address to listen at.
-        * @param listeningPort The port to listen at.
-        * @param logger the logger to output log information.
-        * @param executorMode The executor mode of Akka actor system.
-        * @return The ActorSystem which has been started.
-        * @throws Exception
-        */
-       public static ActorSystem startActorSystem(
-               Configuration configuration,
-               String actorSystemName,
-               String listeningAddress,
-               int listeningPort,
-               Logger logger,
-               ActorSystemExecutorMode executorMode) throws Exception {
+                               Logger logger) throws Exception {
 
                String hostPortUrl = 
NetUtils.unresolvedHostAndPortToNormalizedString(listeningAddress, 
listeningPort);
                logger.info("Trying to start actor system at {}", hostPortUrl);
@@ -262,13 +155,12 @@ public class BootstrapTools {
                try {
                        Config akkaConfig = AkkaUtils.getAkkaConfig(
                                configuration,
-                               new Some<>(new Tuple2<>(listeningAddress, 
listeningPort)),
-                               getExecutorConfigByExecutorMode(configuration, 
executorMode)
+                               new Some<>(new Tuple2<>(listeningAddress, 
listeningPort))
                        );
 
                        logger.debug("Using akka configuration\n {}", 
akkaConfig);
 
-                       ActorSystem actorSystem = 
AkkaUtils.createActorSystem(actorSystemName, akkaConfig);
+                       ActorSystem actorSystem = 
AkkaUtils.createActorSystem(akkaConfig);
 
                        logger.info("Actor system started at {}", 
AkkaUtils.getAddress(actorSystem));
                        return actorSystem;
@@ -278,24 +170,13 @@ public class BootstrapTools {
                                Throwable cause = t.getCause();
                                if (cause != null && t.getCause() instanceof 
BindException) {
                                        throw new IOException("Unable to create 
ActorSystem at address " + hostPortUrl +
-                                               " : " + cause.getMessage(), t);
+                                                       " : " + 
cause.getMessage(), t);
                                }
                        }
                        throw new Exception("Could not create actor system", t);
                }
        }
 
-       private static Config getExecutorConfigByExecutorMode(Configuration 
configuration, ActorSystemExecutorMode executorMode) {
-               switch (executorMode) {
-                       case FORK_JOIN_EXECUTOR:
-                               return 
AkkaUtils.getForkJoinExecutorConfig(configuration);
-                       case FIXED_THREAD_POOL_EXECUTOR:
-                               return AkkaUtils.getThreadPoolExecutorConfig();
-                       default:
-                               throw new 
IllegalArgumentException(String.format("Unknown ActorSystemExecutorMode %s.", 
executorMode));
-               }
-       }
-
        /**
         * Starts the web frontend.
         *
@@ -623,14 +504,4 @@ public class BootstrapTools {
 
                return clonedConfiguration;
        }
-
-       /**
-        * Options to specify which executor to use in an {@link ActorSystem}.
-        */
-       public enum ActorSystemExecutorMode {
-               /** Used by default, use dispatcher with fork-join-executor. **/
-               FORK_JOIN_EXECUTOR,
-               /** Use dispatcher with fixed thread pool executor. **/
-               FIXED_THREAD_POOL_EXECUTOR
-       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 53349e2..2c34b53 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -95,8 +95,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import scala.concurrent.duration.FiniteDuration;
 
-import static 
org.apache.flink.runtime.clusterframework.BootstrapTools.ActorSystemExecutorMode.FORK_JOIN_EXECUTOR;
-
 /**
  * Base class for the Flink cluster entry points.
  *
@@ -156,9 +154,6 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
        private WebMonitorEndpoint<?> webMonitorEndpoint;
 
        @GuardedBy("lock")
-       private ActorSystem metricQueryServiceActorSystem;
-
-       @GuardedBy("lock")
        private ArchivedExecutionGraphStore archivedExecutionGraphStore;
 
        @GuardedBy("lock")
@@ -280,9 +275,9 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
                        metricRegistry = createMetricRegistry(configuration);
 
                        // TODO: This is a temporary hack until we have ported 
the MetricQueryService to the new RpcEndpoint
-                       // Start actor system for metric query service on any 
available port
-                       metricQueryServiceActorSystem = 
MetricUtils.startMetricsActorSystem(configuration, bindAddress, LOG);
-                       
metricRegistry.startQueryService(metricQueryServiceActorSystem, null);
+                       // start the MetricQueryService
+                       final ActorSystem actorSystem = ((AkkaRpcService) 
commonRpcService).getActorSystem();
+                       metricRegistry.startQueryService(actorSystem, null);
 
                        archivedExecutionGraphStore = 
createSerializableExecutionGraphStore(configuration, 
commonRpcService.getScheduledExecutor());
 
@@ -400,7 +395,7 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
                        Configuration configuration,
                        String bindAddress,
                        String portRange) throws Exception {
-               ActorSystem actorSystem = 
BootstrapTools.startActorSystem(configuration, bindAddress, portRange, LOG, 
FORK_JOIN_EXECUTOR);
+               ActorSystem actorSystem = 
BootstrapTools.startActorSystem(configuration, bindAddress, portRange, LOG);
                FiniteDuration duration = AkkaUtils.getTimeout(configuration);
                return new AkkaRpcService(actorSystem, 
Time.of(duration.length(), duration.unit()));
        }
@@ -468,10 +463,6 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
                                
terminationFutures.add(metricRegistry.shutdown());
                        }
 
-                       if (metricQueryServiceActorSystem != null) {
-                               
terminationFutures.add(AkkaUtils.terminateActorSystem(metricQueryServiceActorSystem));
-                       }
-
                        if (commonRpcService != null) {
                                
terminationFutures.add(commonRpcService.stopService());
                        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
index 39e5f44..3fd268a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
@@ -18,11 +18,8 @@
 
 package org.apache.flink.runtime.metrics.util;
 
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
@@ -30,7 +27,6 @@ import 
org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.Preconditions;
 
-import akka.actor.ActorSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,15 +45,12 @@ import java.lang.management.MemoryMXBean;
 import java.lang.management.ThreadMXBean;
 import java.util.List;
 
-import static 
org.apache.flink.runtime.clusterframework.BootstrapTools.ActorSystemExecutorMode.FIXED_THREAD_POOL_EXECUTOR;
-
 /**
  * Utility class to register pre-defined metric sets.
  */
 public class MetricUtils {
        private static final Logger LOG = 
LoggerFactory.getLogger(MetricUtils.class);
        private static final String METRIC_GROUP_STATUS_NAME = "Status";
-       private static final String METRICS_ACTOR_SYSTEM_NAME = "flink-metrics";
 
        private MetricUtils() {
        }
@@ -109,17 +102,6 @@ public class MetricUtils {
                instantiateCPUMetrics(jvm.addGroup("CPU"));
        }
 
-       public static ActorSystem startMetricsActorSystem(Configuration 
configuration, String hostname, Logger logger) throws Exception {
-               final String portRange = 
configuration.getString(MetricOptions.QUERY_SERVICE_PORT);
-               return BootstrapTools.startActorSystem(
-                       configuration,
-                       METRICS_ACTOR_SYSTEM_NAME,
-                       hostname,
-                       portRange,
-                       logger,
-                       FIXED_THREAD_POOL_EXECUTOR);
-       }
-
        private static void instantiateNetworkMetrics(
                MetricGroup metrics,
                final NetworkEnvironment network) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 46f7ec3..a7840d6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -136,9 +136,6 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
        private RpcService resourceManagerRpcService;
 
        @GuardedBy("lock")
-       private ActorSystem metricQueryServiceActorSystem;
-
-       @GuardedBy("lock")
        private HighAvailabilityServices haServices;
 
        @GuardedBy("lock")
@@ -257,11 +254,8 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
                                commonRpcService = 
createRpcService(configuration, rpcTimeout, false, null);
 
                                // TODO: Temporary hack until the metric query 
service is ported to the RpcEndpoint
-                               metricQueryServiceActorSystem = 
MetricUtils.startMetricsActorSystem(
-                                       configuration,
-                                       commonRpcService.getAddress(),
-                                       LOG);
-                               
metricRegistry.startQueryService(metricQueryServiceActorSystem, null);
+                               final ActorSystem actorSystem = 
((AkkaRpcService) commonRpcService).getActorSystem();
+                               metricRegistry.startQueryService(actorSystem, 
null);
 
                                if (useSingleRpcService) {
                                        for (int i = 0; i < numTaskManagers; 
i++) {
@@ -358,7 +352,7 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
                                                
configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1),
                                                "DispatcherRestEndpoint"),
                                        new AkkaQueryServiceRetriever(
-                                               metricQueryServiceActorSystem,
+                                               actorSystem,
                                                
Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))),
                                        
haServices.getWebMonitorLeaderElectionService(),
                                        new ShutDownFatalErrorHandler());
@@ -455,12 +449,24 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
 
                                        final FutureUtils.ConjunctFuture<Void> 
componentsTerminationFuture = 
FutureUtils.completeAll(componentTerminationFutures);
 
-                                       final CompletableFuture<Void> 
metricSystemTerminationFuture = FutureUtils.composeAfterwards(
+                                       final CompletableFuture<Void> 
metricRegistryTerminationFuture = FutureUtils.runAfterwards(
                                                componentsTerminationFuture,
-                                               this::closeMetricSystem);
+                                               () -> {
+                                                       synchronized (lock) {
+                                                               if 
(jobManagerMetricGroup != null) {
+                                                                       
jobManagerMetricGroup.close();
+                                                                       
jobManagerMetricGroup = null;
+                                                               }
+                                                               // metrics 
shutdown
+                                                               if 
(metricRegistry != null) {
+                                                                       
metricRegistry.shutdown();
+                                                                       
metricRegistry = null;
+                                                               }
+                                                       }
+                                               });
 
                                        // shut down the RpcServices
-                                       final CompletableFuture<Void> 
rpcServicesTerminationFuture = metricSystemTerminationFuture
+                                       final CompletableFuture<Void> 
rpcServicesTerminationFuture = metricRegistryTerminationFuture
                                                .thenCompose((Void ignored) -> 
terminateRpcServices());
 
                                        final CompletableFuture<Void> 
remainingServicesTerminationFuture = FutureUtils.runAfterwards(
@@ -484,29 +490,6 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
                }
        }
 
-       private CompletableFuture<Void> closeMetricSystem() {
-               synchronized (lock) {
-                       if (jobManagerMetricGroup != null) {
-                               jobManagerMetricGroup.close();
-                               jobManagerMetricGroup = null;
-                       }
-
-                       final ArrayList<CompletableFuture<Void>> 
terminationFutures = new ArrayList<>(2);
-
-                       // metrics shutdown
-                       if (metricRegistry != null) {
-                               
terminationFutures.add(metricRegistry.shutdown());
-                               metricRegistry = null;
-                       }
-
-                       if (metricQueryServiceActorSystem != null) {
-                               
terminationFutures.add(AkkaUtils.terminateActorSystem(metricQueryServiceActorSystem));
-                       }
-
-                       return FutureUtils.completeAll(terminationFutures);
-               }
-       }
-
        // 
------------------------------------------------------------------------
        //  Accessing jobs
        // 
------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index a3a075d..d78e346 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -48,6 +48,7 @@ import 
org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.MetricQueryService;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
@@ -75,13 +76,11 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -594,26 +593,19 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
 
        @Override
        public CompletableFuture<Collection<Tuple2<ResourceID, String>>> 
requestTaskManagerMetricQueryServicePaths(Time timeout) {
-               final ArrayList<CompletableFuture<Optional<Tuple2<ResourceID, 
String>>>> metricQueryServicePathFutures = new 
ArrayList<>(taskExecutors.size());
+               final ArrayList<Tuple2<ResourceID, String>> 
metricQueryServicePaths = new ArrayList<>(taskExecutors.size());
 
                for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> 
workerRegistrationEntry : taskExecutors.entrySet()) {
                        final ResourceID tmResourceId = 
workerRegistrationEntry.getKey();
                        final WorkerRegistration<WorkerType> workerRegistration 
= workerRegistrationEntry.getValue();
-                       final TaskExecutorGateway taskExecutorGateway = 
workerRegistration.getTaskExecutorGateway();
+                       final String taskManagerAddress = 
workerRegistration.getTaskExecutorGateway().getAddress();
+                       final String tmMetricQueryServicePath = 
taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) +
+                               MetricQueryService.METRIC_QUERY_SERVICE_NAME + 
'_' + tmResourceId.getResourceIdString();
 
-                       final CompletableFuture<Optional<Tuple2<ResourceID, 
String>>> metricQueryServicePathFuture = taskExecutorGateway
-                               .requestMetricQueryServiceAddress(timeout)
-                               .thenApply(optional -> optional.map(path -> 
Tuple2.of(tmResourceId, path)));
-
-                       
metricQueryServicePathFutures.add(metricQueryServicePathFuture);
+                       metricQueryServicePaths.add(Tuple2.of(tmResourceId, 
tmMetricQueryServicePath));
                }
 
-               return 
FutureUtils.combineAll(metricQueryServicePathFutures).thenApply(
-                       collection -> collection
-                               .stream()
-                               .filter(Optional::isPresent)
-                               .map(Optional::get)
-                               .collect(Collectors.toList()));
+               return 
CompletableFuture.completedFuture(metricQueryServicePaths);
        }
 
        @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
index ac1a187..1f7ec12 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
@@ -70,10 +70,7 @@ public class AkkaRpcServiceUtils {
         * @throws IOException      Thrown, if the actor system can not bind to 
the address
         * @throws Exception      Thrown is some other error occurs while 
creating akka actor system
         */
-       public static RpcService createRpcService(
-               String hostname,
-               int port,
-               Configuration configuration) throws Exception {
+       public static RpcService createRpcService(String hostname, int port, 
Configuration configuration) throws Exception {
                LOG.info("Starting AkkaRpcService at {}.", 
NetUtils.unresolvedHostAndPortToNormalizedString(hostname, port));
 
                final ActorSystem actorSystem;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 3d06efd..665f072 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -101,7 +101,6 @@ import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.types.SerializableOptional;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 
@@ -157,10 +156,6 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
 
        private final BlobCacheService blobCacheService;
 
-       /** The path to metric query service on this Task Manager. */
-       @Nullable
-       private final String metricQueryServicePath;
-
        // --------- TaskManager services --------
 
        /** The connection information of this task manager. */
@@ -213,7 +208,6 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                        TaskManagerServices taskExecutorServices,
                        HeartbeatServices heartbeatServices,
                        TaskManagerMetricGroup taskManagerMetricGroup,
-                       @Nullable String metricQueryServicePath,
                        BlobCacheService blobCacheService,
                        FatalErrorHandler fatalErrorHandler) {
 
@@ -227,7 +221,6 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
                this.taskManagerMetricGroup = 
checkNotNull(taskManagerMetricGroup);
                this.blobCacheService = checkNotNull(blobCacheService);
-               this.metricQueryServicePath = metricQueryServicePath;
 
                this.taskSlotTable = taskExecutorServices.getTaskSlotTable();
                this.jobManagerTable = 
taskExecutorServices.getJobManagerTable();
@@ -848,11 +841,6 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                }
        }
 
-       @Override
-       public CompletableFuture<SerializableOptional<String>> 
requestMetricQueryServiceAddress(Time timeout) {
-               return 
CompletableFuture.completedFuture(SerializableOptional.ofNullable(metricQueryServicePath));
-       }
-
        // 
----------------------------------------------------------------------
        // Disconnection RPCs
        // 
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index d6b9e15..4f79289 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -36,7 +36,6 @@ import 
org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.types.SerializableOptional;
 
 import java.util.concurrent.CompletableFuture;
 
@@ -196,11 +195,4 @@ public interface TaskExecutorGateway extends RpcGateway {
         * @return Future which is completed with the {@link TransientBlobKey} 
of the uploaded file.
         */
        CompletableFuture<TransientBlobKey> requestFileUpload(FileType 
fileType, @RpcTimeout Time timeout);
-
-       /**
-        * Returns the fully qualified address of Metric Query Service on the 
TaskManager.
-        *
-        * @return Future String with Fully qualified (RPC) address of Metric 
Query Service on the TaskManager.
-        */
-       CompletableFuture<SerializableOptional<String>> 
requestMetricQueryServiceAddress(@RpcTimeout Time timeout);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 2d17883..1cd61fe 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -39,6 +39,7 @@ import 
org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.util.MetricUtils;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
@@ -96,8 +97,6 @@ public class TaskManagerRunner implements FatalErrorHandler, 
AutoCloseableAsync
 
        private final RpcService rpcService;
 
-       private final ActorSystem metricQueryServiceActorSystem;
-
        private final HighAvailabilityServices highAvailabilityServices;
 
        private final MetricRegistryImpl metricRegistry;
@@ -129,14 +128,14 @@ public class TaskManagerRunner implements 
FatalErrorHandler, AutoCloseableAsync
                        
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
 
                rpcService = createRpcService(configuration, 
highAvailabilityServices);
-               metricQueryServiceActorSystem = 
MetricUtils.startMetricsActorSystem(configuration, rpcService.getAddress(), 
LOG);
 
                HeartbeatServices heartbeatServices = 
HeartbeatServices.fromConfiguration(configuration);
 
                metricRegistry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration));
 
                // TODO: Temporary hack until the MetricQueryService has been 
ported to RpcEndpoint
-               metricRegistry.startQueryService(metricQueryServiceActorSystem, 
resourceId);
+               final ActorSystem actorSystem = ((AkkaRpcService) 
rpcService).getActorSystem();
+               metricRegistry.startQueryService(actorSystem, resourceId);
 
                blobCacheService = new BlobCacheService(
                        configuration, 
highAvailabilityServices.createBlobStore(), null
@@ -156,7 +155,7 @@ public class TaskManagerRunner implements 
FatalErrorHandler, AutoCloseableAsync
                this.terminationFuture = new CompletableFuture<>();
                this.shutdown = false;
 
-               MemoryLogger.startIfConfigured(LOG, configuration, 
metricQueryServiceActorSystem);
+               MemoryLogger.startIfConfigured(LOG, configuration, actorSystem);
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -211,10 +210,6 @@ public class TaskManagerRunner implements 
FatalErrorHandler, AutoCloseableAsync
                                exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
                        }
 
-                       if (metricQueryServiceActorSystem != null) {
-                               
terminationFutures.add(AkkaUtils.terminateActorSystem(metricQueryServiceActorSystem));
-                       }
-
                        try {
                                highAvailabilityServices.close();
                        } catch (Exception e) {
@@ -361,8 +356,6 @@ public class TaskManagerRunner implements 
FatalErrorHandler, AutoCloseableAsync
 
                TaskManagerConfiguration taskManagerConfiguration = 
TaskManagerConfiguration.fromConfiguration(configuration);
 
-               String metricQueryServicePath = 
metricRegistry.getMetricQueryServicePath();
-
                return new TaskExecutor(
                        rpcService,
                        taskManagerConfiguration,
@@ -370,7 +363,6 @@ public class TaskManagerRunner implements 
FatalErrorHandler, AutoCloseableAsync
                        taskManagerServices,
                        heartbeatServices,
                        taskManagerMetricGroup,
-                       metricQueryServicePath,
                        blobCacheService,
                        fatalErrorHandler);
        }
@@ -407,14 +399,6 @@ public class TaskManagerRunner implements 
FatalErrorHandler, AutoCloseableAsync
 
                final String portRangeDefinition = 
configuration.getString(TaskManagerOptions.RPC_PORT);
 
-               return bindWithPort(configuration, taskManagerHostname, 
portRangeDefinition);
-       }
-
-       private static RpcService bindWithPort(
-               Configuration configuration,
-               String taskManagerHostname,
-               String portRangeDefinition) throws Exception{
-
                // parse port range definition and create port iterator
                Iterator<Integer> portsIterator;
                try {
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 39b287e..6e161f7 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -50,12 +50,6 @@ object AkkaUtils {
 
   val INF_TIMEOUT: FiniteDuration = 21474835 seconds
 
-  val FLINK_ACTOR_SYSTEM_NAME = "flink"
-
-  def getFlinkActorSystemName = {
-    FLINK_ACTOR_SYSTEM_NAME
-  }
-
   /**
    * Creates a local actor system without remoting.
    *
@@ -109,19 +103,9 @@ object AkkaUtils {
    * @return created actor system
    */
   def createActorSystem(akkaConfig: Config): ActorSystem = {
-    createActorSystem(FLINK_ACTOR_SYSTEM_NAME, akkaConfig)
-  }
-
-  /**
-    * Creates an actor system with the given akka config.
-    *
-    * @param akkaConfig configuration for the actor system
-    * @return created actor system
-    */
-  def createActorSystem(actorSystemName: String, akkaConfig: Config): 
ActorSystem = {
     // Initialize slf4j as logger of Akka's Netty instead of java.util.logging 
(FLINK-1650)
     InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory)
-    ActorSystem.create(actorSystemName, akkaConfig)
+    ActorSystem.create("flink", akkaConfig)
   }
 
   /**
@@ -135,23 +119,7 @@ object AkkaUtils {
   }
 
   /**
-    * Returns a remote Akka config for the given configuration values.
-    *
-    * @param configuration containing the user provided configuration values
-    * @param hostname to bind against. If null, then the loopback interface is 
used
-    * @param port to bind against
-    * @param executorMode containing the user specified mode of executor
-    * @return A remote Akka config
-    */
-  def getAkkaConfig(configuration: Configuration,
-                    hostname: String,
-                    port: Int,
-                    executorConfig: Config): Config = {
-    getAkkaConfig(configuration, Some((hostname, port)), executorConfig)
-  }
-
-  /**
-    * Returns a remote Akka config for the given configuration values.
+    * Return a remote Akka config for the given configuration values.
     *
     * @param configuration containing the user provided configuration values
     * @param hostname to bind against. If null, then the loopback interface is 
used
@@ -187,25 +155,7 @@ object AkkaUtils {
   @throws(classOf[UnknownHostException])
   def getAkkaConfig(configuration: Configuration,
                     externalAddress: Option[(String, Int)]): Config = {
-    getAkkaConfig(configuration, externalAddress, 
getForkJoinExecutorConfig(configuration))
-  }
-
-  /**
-    * Creates an akka config with the provided configuration values. If the 
listening address is
-    * specified, then the actor system will listen on the respective address.
-    *
-    * @param configuration instance containing the user provided configuration 
values
-    * @param externalAddress optional tuple of bindAddress and port to be 
reachable at.
-    *                        If None is given, then an Akka config for local 
actor system
-    *                        will be returned
-    * @param executorConfig config defining the used executor by the default 
dispatcher
-    * @return Akka config
-    */
-  @throws(classOf[UnknownHostException])
-  def getAkkaConfig(configuration: Configuration,
-                    externalAddress: Option[(String, Int)],
-                    executorConfig: Config): Config = {
-    val defaultConfig = 
getBasicAkkaConfig(configuration).withFallback(executorConfig)
+    val defaultConfig = getBasicAkkaConfig(configuration)
 
     externalAddress match {
 
@@ -257,6 +207,24 @@ object AkkaUtils {
     val supervisorStrategy = 
classOf[StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy]
       .getCanonicalName
 
+    val forkJoinExecutorParallelismFactor =
+      
configuration.getDouble(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR)
+
+    val forkJoinExecutorParallelismMin =
+      configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MIN)
+
+    val forkJoinExecutorParallelismMax =
+      configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MAX)
+
+    val forkJoinExecutorConfig =
+      s"""
+         | fork-join-executor {
+         |   parallelism-factor = $forkJoinExecutorParallelismFactor
+         |   parallelism-min = $forkJoinExecutorParallelismMin
+         |   parallelism-max = $forkJoinExecutorParallelismMax
+         | }
+       """.stripMargin
+
     val config =
       s"""
         |akka {
@@ -283,6 +251,8 @@ object AkkaUtils {
         |
         |   default-dispatcher {
         |     throughput = $akkaThroughput
+        |
+        |   $forkJoinExecutorConfig
         |   }
         | }
         |}
@@ -291,53 +261,6 @@ object AkkaUtils {
     ConfigFactory.parseString(config)
   }
 
-  def getThreadPoolExecutorConfig: Config = {
-    val configString = s"""
-       |akka {
-       |  actor {
-       |    default-dispatcher {
-       |      executor = "thread-pool-executor"
-       |      thread-pool-executor {
-       |        core-pool-size-min = 2
-       |        core-pool-size-factor = 2.0
-       |        core-pool-size-max = 4
-       |      }
-       |    }
-       |  }
-       |}
-        """.
-      stripMargin
-
-    ConfigFactory.parseString(configString)
-  }
-
-  def getForkJoinExecutorConfig(configuration: Configuration): Config = {
-    val forkJoinExecutorParallelismFactor =
-      
configuration.getDouble(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR)
-
-    val forkJoinExecutorParallelismMin =
-      configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MIN)
-
-    val forkJoinExecutorParallelismMax =
-      configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MAX)
-
-    val configString = s"""
-       |akka {
-       |  actor {
-       |    default-dispatcher {
-       |      executor = "fork-join-executor"
-       |      fork-join-executor {
-       |        parallelism-factor = $forkJoinExecutorParallelismFactor
-       |        parallelism-min = $forkJoinExecutorParallelismMin
-       |        parallelism-max = $forkJoinExecutorParallelismMax
-       |      }
-       |    }
-       |  }
-       |}""".stripMargin
-
-    ConfigFactory.parseString(configString)
-  }
-
   def testDispatcherConfig: Config = {
     val config =
       s"""
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 3ee1b92..6e0f9c5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -165,7 +165,6 @@ public class TaskExecutorITCase extends TestLogger {
                        taskManagerServices,
                        heartbeatServices,
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        new BlobCacheService(
                                configuration,
                                new VoidBlobStore(),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 179dea2..4af0529 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -279,7 +279,6 @@ public class TaskExecutorTest extends TestLogger {
                        taskManagerServices,
                        heartbeatServices,
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -370,7 +369,6 @@ public class TaskExecutorTest extends TestLogger {
                        taskManagerServices,
                        heartbeatServices,
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -486,7 +484,6 @@ public class TaskExecutorTest extends TestLogger {
                        taskManagerServices,
                        heartbeatServices,
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -565,7 +562,6 @@ public class TaskExecutorTest extends TestLogger {
                        taskManagerServices,
                        new HeartbeatServices(1000L, 1000L),
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -629,7 +625,6 @@ public class TaskExecutorTest extends TestLogger {
                        taskManagerServices,
                        new HeartbeatServices(1000L, 1000L),
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -752,7 +747,6 @@ public class TaskExecutorTest extends TestLogger {
                        taskManagerServices,
                        new HeartbeatServices(1000L, 1000L),
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -852,7 +846,6 @@ public class TaskExecutorTest extends TestLogger {
                        taskManagerServices,
                        new HeartbeatServices(1000L, 1000L),
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -884,7 +877,7 @@ public class TaskExecutorTest extends TestLogger {
                        // the job leader should get the allocation id offered
                        verify(jobMasterGateway, 
Mockito.timeout(timeout.toMilliseconds())).offerSlots(
                                        any(ResourceID.class),
-                                       
(Collection<SlotOffer>)Matchers.argThat(contains(slotOffer)),
+                                       (Collection<SlotOffer>) 
Matchers.argThat(contains(slotOffer)),
                                        any(Time.class));
                } finally {
                        RpcUtils.terminateRpcEndpoint(taskManager, timeout);
@@ -967,7 +960,6 @@ public class TaskExecutorTest extends TestLogger {
                        taskManagerServices,
                        new HeartbeatServices(1000L, 1000L),
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -1062,7 +1054,6 @@ public class TaskExecutorTest extends TestLogger {
                        taskManagerServices,
                        new HeartbeatServices(1000L, 1000L),
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -1169,7 +1160,6 @@ public class TaskExecutorTest extends TestLogger {
                        taskManagerServices,
                        heartbeatServicesMock,
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -1243,7 +1233,6 @@ public class TaskExecutorTest extends TestLogger {
                        taskManagerServices,
                        heartbeatServices,
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -1300,7 +1289,6 @@ public class TaskExecutorTest extends TestLogger {
                        taskManagerServices,
                        new HeartbeatServices(1000L, 1000L),
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -1393,7 +1381,6 @@ public class TaskExecutorTest extends TestLogger {
                        taskManagerServices,
                        new HeartbeatServices(heartbeatInterval, 10L),
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -1501,7 +1488,6 @@ public class TaskExecutorTest extends TestLogger {
                                .build(),
                        new HeartbeatServices(heartbeatInterval, 1000L),
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -1713,7 +1699,6 @@ public class TaskExecutorTest extends TestLogger {
                        taskManagerServices,
                        new HeartbeatServices(1000L, 1000L),
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
index 5fd12a8..a9e9949 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
@@ -34,7 +34,6 @@ import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.StackTraceSampleResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
-import org.apache.flink.types.SerializableOptional;
 import org.apache.flink.util.Preconditions;
 
 import java.util.concurrent.CompletableFuture;
@@ -151,11 +150,6 @@ public class TestingTaskExecutorGateway implements 
TaskExecutorGateway {
        }
 
        @Override
-       public CompletableFuture<SerializableOptional<String>> 
requestMetricQueryServiceAddress(Time timeout) {
-               return 
CompletableFuture.completedFuture(SerializableOptional.of(address));
-       }
-
-       @Override
        public String getAddress() {
                return address;
        }
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
index e5c1668..d02a554 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.akka
 
-import java.net.{InetAddress, InetSocketAddress}
+import java.net.InetSocketAddress
 
 import org.apache.flink.configuration.{AkkaOptions, Configuration, 
IllegalConfigurationException}
 import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution
@@ -167,30 +167,4 @@ class AkkaUtilsTest
     akkaConfig.getString("akka.remote.netty.tcp.hostname") should
       equal(NetUtils.unresolvedHostToNormalizedString(hostname))
   }
-
-  test("null hostname should go to localhost") {
-    val configure = AkkaUtils.getAkkaConfig(new Configuration(), Some((null, 
1772)))
-
-    val hostname = configure.getString("akka.remote.netty.tcp.hostname")
-
-    InetAddress.getByName(hostname).isLoopbackAddress should be(true)
-  }
-
-  test("getAkkaConfig defaults to fork-join-executor") {
-    val akkaConfig = AkkaUtils.getAkkaConfig(new Configuration())
-
-    akkaConfig.getString("akka.actor.default-dispatcher.executor") should
-      equal("fork-join-executor")
-  }
-
-  test("getAkkaConfig respects executor config") {
-    val akkaConfig = AkkaUtils.getAkkaConfig(
-      new Configuration(),
-      "localhost",
-      1234,
-      AkkaUtils.getThreadPoolExecutorConfig)
-
-    akkaConfig.getString("akka.actor.default-dispatcher.executor") should
-      equal("thread-pool-executor")
-  }
 }

Reply via email to