This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.5 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 814d9984ed459f77c01e9ac5971cff17978a8e89 Author: Till Rohrmann <[email protected]> AuthorDate: Tue Oct 16 13:58:15 2018 +0200 Revert "[FLINK-10247] Run MetricQueryService in separate ActorSystem" This reverts commit 1368d1be56dc4c73506580eda3d38f0a965edd3c. --- docs/_includes/generated/metric_configuration.html | 5 - .../apache/flink/configuration/MetricOptions.java | 12 -- .../apache/flink/types/SerializableOptional.java | 86 ------------- .../runtime/clusterframework/BootstrapTools.java | 143 +-------------------- .../runtime/entrypoint/ClusterEntrypoint.java | 17 +-- .../flink/runtime/metrics/util/MetricUtils.java | 18 --- .../flink/runtime/minicluster/MiniCluster.java | 53 +++----- .../runtime/resourcemanager/ResourceManager.java | 22 +--- .../runtime/rpc/akka/AkkaRpcServiceUtils.java | 5 +- .../flink/runtime/taskexecutor/TaskExecutor.java | 12 -- .../runtime/taskexecutor/TaskExecutorGateway.java | 8 -- .../runtime/taskexecutor/TaskManagerRunner.java | 24 +--- .../org/apache/flink/runtime/akka/AkkaUtils.scala | 123 ++++-------------- .../runtime/taskexecutor/TaskExecutorITCase.java | 1 - .../runtime/taskexecutor/TaskExecutorTest.java | 17 +-- .../taskexecutor/TestingTaskExecutorGateway.java | 6 - .../apache/flink/runtime/akka/AkkaUtilsTest.scala | 28 +--- 17 files changed, 66 insertions(+), 514 deletions(-) diff --git a/docs/_includes/generated/metric_configuration.html b/docs/_includes/generated/metric_configuration.html index cba93e4..99992b2 100644 --- a/docs/_includes/generated/metric_configuration.html +++ b/docs/_includes/generated/metric_configuration.html @@ -8,11 +8,6 @@ </thead> <tbody> <tr> - <td><h5>metrics.internal.query-service.port</h5></td> - <td style="word-wrap: break-word;">"0"</td> - <td>The port range used for Flink's internal metric query service. Accepts a list of ports (“50100,50101”), ranges(“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple Flink components are running on the same machine. Per default Flink will pick a random port.</td> - </tr> - <tr> <td><h5>metrics.latency.granularity</h5></td> <td style="word-wrap: break-word;">"subtask"</td> <td>Defines the granularity of latency metrics. Accepted values are "single"(Track latency without differentiating between sources and subtasks), "operator"(Track latency while differentiating between sources, but not subtasks) and "subtask"(Track latency while differentiating between sources and subtasks).</td> diff --git a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java index bbf06eb..81744a2 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java @@ -125,18 +125,6 @@ public class MetricOptions { .defaultValue(128) .withDescription("Defines the number of measured latencies to maintain at each operator."); - /** - * The default network port range for Flink's internal metric query service. The {@code "0"} means that - * Flink searches for a free port. - */ - public static final ConfigOption<String> QUERY_SERVICE_PORT = - key("metrics.internal.query-service.port") - .defaultValue("0") - .withDescription("The port range used for Flink's internal metric query service. Accepts a list of ports " + - "(“50100,50101”), ranges(“50100-50200”) or a combination of both. It is recommended to set a range of " + - "ports to avoid collisions when multiple Flink components are running on the same machine. Per default " + - "Flink will pick a random port."); - private MetricOptions() { } } diff --git a/flink-core/src/main/java/org/apache/flink/types/SerializableOptional.java b/flink-core/src/main/java/org/apache/flink/types/SerializableOptional.java deleted file mode 100644 index 89dcea4..0000000 --- a/flink-core/src/main/java/org/apache/flink/types/SerializableOptional.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.types; - -import javax.annotation.Nonnull; -import javax.annotation.Nullable; - -import java.io.Serializable; -import java.util.NoSuchElementException; -import java.util.Optional; -import java.util.function.Consumer; -import java.util.function.Function; - -/** - * Serializable {@link Optional}. - */ -public final class SerializableOptional<T extends Serializable> implements Serializable { - private static final long serialVersionUID = -3312769593551775940L; - - private static final SerializableOptional<?> EMPTY = new SerializableOptional<>(null); - - @Nullable - private final T value; - - private SerializableOptional(@Nullable T value) { - this.value = value; - } - - public T get() { - if (value == null) { - throw new NoSuchElementException("No value present"); - } - return value; - } - - public boolean isPresent() { - return value != null; - } - - public void ifPresent(Consumer<? super T> consumer) { - if (value != null) { - consumer.accept(value); - } - } - - public <R> Optional<R> map(Function<? super T, ? extends R> mapper) { - if (value == null) { - return Optional.empty(); - } else { - return Optional.ofNullable(mapper.apply(value)); - } - } - - public static <T extends Serializable> SerializableOptional<T> of(@Nonnull T value) { - return new SerializableOptional<>(value); - } - - public static <T extends Serializable> SerializableOptional<T> ofNullable(@Nullable T value) { - if (value == null) { - return empty(); - } else { - return of(value); - } - } - - @SuppressWarnings("unchecked") - public static <T extends Serializable> SerializableOptional<T> empty() { - return (SerializableOptional<T>) EMPTY; - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java index 00b6173..56e4576 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java @@ -46,7 +46,6 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.File; @@ -86,66 +85,13 @@ public class BootstrapTools { * @param portRangeDefinition The port range to choose a port from. * @param logger The logger to output log information. * @return The ActorSystem which has been started - * @throws Exception Thrown when actor system cannot be started in specified port range - */ - public static ActorSystem startActorSystem( - Configuration configuration, - String listeningAddress, - String portRangeDefinition, - Logger logger) throws Exception { - return startActorSystem( - configuration, - listeningAddress, - portRangeDefinition, - logger, - ActorSystemExecutorMode.FORK_JOIN_EXECUTOR); - } - - /** - * Starts an ActorSystem with the given configuration listening at the address/ports. - * - * @param configuration The Flink configuration - * @param listeningAddress The address to listen at. - * @param portRangeDefinition The port range to choose a port from. - * @param logger The logger to output log information. - * @param executorMode The executor mode of Akka actor system. - * @return The ActorSystem which has been started - * @throws Exception Thrown when actor system cannot be started in specified port range - */ - public static ActorSystem startActorSystem( - Configuration configuration, - String listeningAddress, - String portRangeDefinition, - Logger logger, - @Nonnull ActorSystemExecutorMode executorMode) throws Exception { - return startActorSystem( - configuration, - AkkaUtils.getFlinkActorSystemName(), - listeningAddress, - portRangeDefinition, - logger, - executorMode); - } - - /** - * Starts an ActorSystem with the given configuration listening at the address/ports. - * - * @param configuration The Flink configuration - * @param actorSystemName Name of the started {@link ActorSystem} - * @param listeningAddress The address to listen at. - * @param portRangeDefinition The port range to choose a port from. - * @param logger The logger to output log information. - * @param executorMode The executor mode of Akka actor system. - * @return The ActorSystem which has been started - * @throws Exception Thrown when actor system cannot be started in specified port range + * @throws Exception */ public static ActorSystem startActorSystem( Configuration configuration, - String actorSystemName, String listeningAddress, String portRangeDefinition, - Logger logger, - @Nonnull ActorSystemExecutorMode executorMode) throws Exception { + Logger logger) throws Exception { // parse port range definition and create port iterator Iterator<Integer> portsIterator; @@ -171,13 +117,7 @@ public class BootstrapTools { } try { - return startActorSystem( - configuration, - actorSystemName, - listeningAddress, - port, - logger, - executorMode); + return startActorSystem(configuration, listeningAddress, port, logger); } catch (Exception e) { // we can continue to try if this contains a netty channel exception @@ -196,7 +136,6 @@ public class BootstrapTools { /** * Starts an Actor System at a specific port. - * * @param configuration The Flink configuration. * @param listeningAddress The address to listen at. * @param listeningPort The port to listen at. @@ -205,56 +144,10 @@ public class BootstrapTools { * @throws Exception */ public static ActorSystem startActorSystem( - Configuration configuration, - String listeningAddress, - int listeningPort, - Logger logger) throws Exception { - return startActorSystem(configuration, listeningAddress, listeningPort, logger, ActorSystemExecutorMode.FORK_JOIN_EXECUTOR); - } - - /** - * Starts an Actor System at a specific port. - * @param configuration The Flink configuration. - * @param listeningAddress The address to listen at. - * @param listeningPort The port to listen at. - * @param logger the logger to output log information. - * @param executorMode The executor mode of Akka actor system. - * @return The ActorSystem which has been started. - * @throws Exception - */ - public static ActorSystem startActorSystem( Configuration configuration, String listeningAddress, int listeningPort, - Logger logger, - ActorSystemExecutorMode executorMode) throws Exception { - return startActorSystem( - configuration, - AkkaUtils.getFlinkActorSystemName(), - listeningAddress, - listeningPort, - logger, - executorMode); - } - - /** - * Starts an Actor System at a specific port. - * @param configuration The Flink configuration. - * @param actorSystemName Name of the started {@link ActorSystem} - * @param listeningAddress The address to listen at. - * @param listeningPort The port to listen at. - * @param logger the logger to output log information. - * @param executorMode The executor mode of Akka actor system. - * @return The ActorSystem which has been started. - * @throws Exception - */ - public static ActorSystem startActorSystem( - Configuration configuration, - String actorSystemName, - String listeningAddress, - int listeningPort, - Logger logger, - ActorSystemExecutorMode executorMode) throws Exception { + Logger logger) throws Exception { String hostPortUrl = NetUtils.unresolvedHostAndPortToNormalizedString(listeningAddress, listeningPort); logger.info("Trying to start actor system at {}", hostPortUrl); @@ -262,13 +155,12 @@ public class BootstrapTools { try { Config akkaConfig = AkkaUtils.getAkkaConfig( configuration, - new Some<>(new Tuple2<>(listeningAddress, listeningPort)), - getExecutorConfigByExecutorMode(configuration, executorMode) + new Some<>(new Tuple2<>(listeningAddress, listeningPort)) ); logger.debug("Using akka configuration\n {}", akkaConfig); - ActorSystem actorSystem = AkkaUtils.createActorSystem(actorSystemName, akkaConfig); + ActorSystem actorSystem = AkkaUtils.createActorSystem(akkaConfig); logger.info("Actor system started at {}", AkkaUtils.getAddress(actorSystem)); return actorSystem; @@ -278,24 +170,13 @@ public class BootstrapTools { Throwable cause = t.getCause(); if (cause != null && t.getCause() instanceof BindException) { throw new IOException("Unable to create ActorSystem at address " + hostPortUrl + - " : " + cause.getMessage(), t); + " : " + cause.getMessage(), t); } } throw new Exception("Could not create actor system", t); } } - private static Config getExecutorConfigByExecutorMode(Configuration configuration, ActorSystemExecutorMode executorMode) { - switch (executorMode) { - case FORK_JOIN_EXECUTOR: - return AkkaUtils.getForkJoinExecutorConfig(configuration); - case FIXED_THREAD_POOL_EXECUTOR: - return AkkaUtils.getThreadPoolExecutorConfig(); - default: - throw new IllegalArgumentException(String.format("Unknown ActorSystemExecutorMode %s.", executorMode)); - } - } - /** * Starts the web frontend. * @@ -623,14 +504,4 @@ public class BootstrapTools { return clonedConfiguration; } - - /** - * Options to specify which executor to use in an {@link ActorSystem}. - */ - public enum ActorSystemExecutorMode { - /** Used by default, use dispatcher with fork-join-executor. **/ - FORK_JOIN_EXECUTOR, - /** Use dispatcher with fixed thread pool executor. **/ - FIXED_THREAD_POOL_EXECUTOR - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index 53349e2..2c34b53 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -95,8 +95,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import scala.concurrent.duration.FiniteDuration; -import static org.apache.flink.runtime.clusterframework.BootstrapTools.ActorSystemExecutorMode.FORK_JOIN_EXECUTOR; - /** * Base class for the Flink cluster entry points. * @@ -156,9 +154,6 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler { private WebMonitorEndpoint<?> webMonitorEndpoint; @GuardedBy("lock") - private ActorSystem metricQueryServiceActorSystem; - - @GuardedBy("lock") private ArchivedExecutionGraphStore archivedExecutionGraphStore; @GuardedBy("lock") @@ -280,9 +275,9 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler { metricRegistry = createMetricRegistry(configuration); // TODO: This is a temporary hack until we have ported the MetricQueryService to the new RpcEndpoint - // Start actor system for metric query service on any available port - metricQueryServiceActorSystem = MetricUtils.startMetricsActorSystem(configuration, bindAddress, LOG); - metricRegistry.startQueryService(metricQueryServiceActorSystem, null); + // start the MetricQueryService + final ActorSystem actorSystem = ((AkkaRpcService) commonRpcService).getActorSystem(); + metricRegistry.startQueryService(actorSystem, null); archivedExecutionGraphStore = createSerializableExecutionGraphStore(configuration, commonRpcService.getScheduledExecutor()); @@ -400,7 +395,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler { Configuration configuration, String bindAddress, String portRange) throws Exception { - ActorSystem actorSystem = BootstrapTools.startActorSystem(configuration, bindAddress, portRange, LOG, FORK_JOIN_EXECUTOR); + ActorSystem actorSystem = BootstrapTools.startActorSystem(configuration, bindAddress, portRange, LOG); FiniteDuration duration = AkkaUtils.getTimeout(configuration); return new AkkaRpcService(actorSystem, Time.of(duration.length(), duration.unit())); } @@ -468,10 +463,6 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler { terminationFutures.add(metricRegistry.shutdown()); } - if (metricQueryServiceActorSystem != null) { - terminationFutures.add(AkkaUtils.terminateActorSystem(metricQueryServiceActorSystem)); - } - if (commonRpcService != null) { terminationFutures.add(commonRpcService.stopService()); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java index 39e5f44..3fd268a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java @@ -18,11 +18,8 @@ package org.apache.flink.runtime.metrics.util; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.MetricOptions; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; @@ -30,7 +27,6 @@ import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.Preconditions; -import akka.actor.ActorSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,15 +45,12 @@ import java.lang.management.MemoryMXBean; import java.lang.management.ThreadMXBean; import java.util.List; -import static org.apache.flink.runtime.clusterframework.BootstrapTools.ActorSystemExecutorMode.FIXED_THREAD_POOL_EXECUTOR; - /** * Utility class to register pre-defined metric sets. */ public class MetricUtils { private static final Logger LOG = LoggerFactory.getLogger(MetricUtils.class); private static final String METRIC_GROUP_STATUS_NAME = "Status"; - private static final String METRICS_ACTOR_SYSTEM_NAME = "flink-metrics"; private MetricUtils() { } @@ -109,17 +102,6 @@ public class MetricUtils { instantiateCPUMetrics(jvm.addGroup("CPU")); } - public static ActorSystem startMetricsActorSystem(Configuration configuration, String hostname, Logger logger) throws Exception { - final String portRange = configuration.getString(MetricOptions.QUERY_SERVICE_PORT); - return BootstrapTools.startActorSystem( - configuration, - METRICS_ACTOR_SYSTEM_NAME, - hostname, - portRange, - logger, - FIXED_THREAD_POOL_EXECUTOR); - } - private static void instantiateNetworkMetrics( MetricGroup metrics, final NetworkEnvironment network) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 46f7ec3..a7840d6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -136,9 +136,6 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { private RpcService resourceManagerRpcService; @GuardedBy("lock") - private ActorSystem metricQueryServiceActorSystem; - - @GuardedBy("lock") private HighAvailabilityServices haServices; @GuardedBy("lock") @@ -257,11 +254,8 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { commonRpcService = createRpcService(configuration, rpcTimeout, false, null); // TODO: Temporary hack until the metric query service is ported to the RpcEndpoint - metricQueryServiceActorSystem = MetricUtils.startMetricsActorSystem( - configuration, - commonRpcService.getAddress(), - LOG); - metricRegistry.startQueryService(metricQueryServiceActorSystem, null); + final ActorSystem actorSystem = ((AkkaRpcService) commonRpcService).getActorSystem(); + metricRegistry.startQueryService(actorSystem, null); if (useSingleRpcService) { for (int i = 0; i < numTaskManagers; i++) { @@ -358,7 +352,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1), "DispatcherRestEndpoint"), new AkkaQueryServiceRetriever( - metricQueryServiceActorSystem, + actorSystem, Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))), haServices.getWebMonitorLeaderElectionService(), new ShutDownFatalErrorHandler()); @@ -455,12 +449,24 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { final FutureUtils.ConjunctFuture<Void> componentsTerminationFuture = FutureUtils.completeAll(componentTerminationFutures); - final CompletableFuture<Void> metricSystemTerminationFuture = FutureUtils.composeAfterwards( + final CompletableFuture<Void> metricRegistryTerminationFuture = FutureUtils.runAfterwards( componentsTerminationFuture, - this::closeMetricSystem); + () -> { + synchronized (lock) { + if (jobManagerMetricGroup != null) { + jobManagerMetricGroup.close(); + jobManagerMetricGroup = null; + } + // metrics shutdown + if (metricRegistry != null) { + metricRegistry.shutdown(); + metricRegistry = null; + } + } + }); // shut down the RpcServices - final CompletableFuture<Void> rpcServicesTerminationFuture = metricSystemTerminationFuture + final CompletableFuture<Void> rpcServicesTerminationFuture = metricRegistryTerminationFuture .thenCompose((Void ignored) -> terminateRpcServices()); final CompletableFuture<Void> remainingServicesTerminationFuture = FutureUtils.runAfterwards( @@ -484,29 +490,6 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { } } - private CompletableFuture<Void> closeMetricSystem() { - synchronized (lock) { - if (jobManagerMetricGroup != null) { - jobManagerMetricGroup.close(); - jobManagerMetricGroup = null; - } - - final ArrayList<CompletableFuture<Void>> terminationFutures = new ArrayList<>(2); - - // metrics shutdown - if (metricRegistry != null) { - terminationFutures.add(metricRegistry.shutdown()); - metricRegistry = null; - } - - if (metricQueryServiceActorSystem != null) { - terminationFutures.add(AkkaUtils.terminateActorSystem(metricQueryServiceActorSystem)); - } - - return FutureUtils.completeAll(terminationFutures); - } - } - // ------------------------------------------------------------------------ // Accessing jobs // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index a3a075d..d78e346 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -48,6 +48,7 @@ import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.dump.MetricQueryService; import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; @@ -75,13 +76,11 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeoutException; -import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -594,26 +593,19 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> @Override public CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(Time timeout) { - final ArrayList<CompletableFuture<Optional<Tuple2<ResourceID, String>>>> metricQueryServicePathFutures = new ArrayList<>(taskExecutors.size()); + final ArrayList<Tuple2<ResourceID, String>> metricQueryServicePaths = new ArrayList<>(taskExecutors.size()); for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> workerRegistrationEntry : taskExecutors.entrySet()) { final ResourceID tmResourceId = workerRegistrationEntry.getKey(); final WorkerRegistration<WorkerType> workerRegistration = workerRegistrationEntry.getValue(); - final TaskExecutorGateway taskExecutorGateway = workerRegistration.getTaskExecutorGateway(); + final String taskManagerAddress = workerRegistration.getTaskExecutorGateway().getAddress(); + final String tmMetricQueryServicePath = taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) + + MetricQueryService.METRIC_QUERY_SERVICE_NAME + '_' + tmResourceId.getResourceIdString(); - final CompletableFuture<Optional<Tuple2<ResourceID, String>>> metricQueryServicePathFuture = taskExecutorGateway - .requestMetricQueryServiceAddress(timeout) - .thenApply(optional -> optional.map(path -> Tuple2.of(tmResourceId, path))); - - metricQueryServicePathFutures.add(metricQueryServicePathFuture); + metricQueryServicePaths.add(Tuple2.of(tmResourceId, tmMetricQueryServicePath)); } - return FutureUtils.combineAll(metricQueryServicePathFutures).thenApply( - collection -> collection - .stream() - .filter(Optional::isPresent) - .map(Optional::get) - .collect(Collectors.toList())); + return CompletableFuture.completedFuture(metricQueryServicePaths); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java index ac1a187..1f7ec12 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java @@ -70,10 +70,7 @@ public class AkkaRpcServiceUtils { * @throws IOException Thrown, if the actor system can not bind to the address * @throws Exception Thrown is some other error occurs while creating akka actor system */ - public static RpcService createRpcService( - String hostname, - int port, - Configuration configuration) throws Exception { + public static RpcService createRpcService(String hostname, int port, Configuration configuration) throws Exception { LOG.info("Starting AkkaRpcService at {}.", NetUtils.unresolvedHostAndPortToNormalizedString(hostname, port)); final ActorSystem actorSystem; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 3d06efd..665f072 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -101,7 +101,6 @@ import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerActions; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import org.apache.flink.types.SerializableOptional; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; @@ -157,10 +156,6 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { private final BlobCacheService blobCacheService; - /** The path to metric query service on this Task Manager. */ - @Nullable - private final String metricQueryServicePath; - // --------- TaskManager services -------- /** The connection information of this task manager. */ @@ -213,7 +208,6 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { TaskManagerServices taskExecutorServices, HeartbeatServices heartbeatServices, TaskManagerMetricGroup taskManagerMetricGroup, - @Nullable String metricQueryServicePath, BlobCacheService blobCacheService, FatalErrorHandler fatalErrorHandler) { @@ -227,7 +221,6 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { this.fatalErrorHandler = checkNotNull(fatalErrorHandler); this.taskManagerMetricGroup = checkNotNull(taskManagerMetricGroup); this.blobCacheService = checkNotNull(blobCacheService); - this.metricQueryServicePath = metricQueryServicePath; this.taskSlotTable = taskExecutorServices.getTaskSlotTable(); this.jobManagerTable = taskExecutorServices.getJobManagerTable(); @@ -848,11 +841,6 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { } } - @Override - public CompletableFuture<SerializableOptional<String>> requestMetricQueryServiceAddress(Time timeout) { - return CompletableFuture.completedFuture(SerializableOptional.ofNullable(metricQueryServicePath)); - } - // ---------------------------------------------------------------------- // Disconnection RPCs // ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java index d6b9e15..4f79289 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java @@ -36,7 +36,6 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.taskmanager.Task; -import org.apache.flink.types.SerializableOptional; import java.util.concurrent.CompletableFuture; @@ -196,11 +195,4 @@ public interface TaskExecutorGateway extends RpcGateway { * @return Future which is completed with the {@link TransientBlobKey} of the uploaded file. */ CompletableFuture<TransientBlobKey> requestFileUpload(FileType fileType, @RpcTimeout Time timeout); - - /** - * Returns the fully qualified address of Metric Query Service on the TaskManager. - * - * @return Future String with Fully qualified (RPC) address of Metric Query Service on the TaskManager. - */ - CompletableFuture<SerializableOptional<String>> requestMetricQueryServiceAddress(@RpcTimeout Time timeout); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index 2d17883..1cd61fe 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -39,6 +39,7 @@ import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.metrics.util.MetricUtils; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils; import org.apache.flink.runtime.security.SecurityConfiguration; import org.apache.flink.runtime.security.SecurityUtils; @@ -96,8 +97,6 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync private final RpcService rpcService; - private final ActorSystem metricQueryServiceActorSystem; - private final HighAvailabilityServices highAvailabilityServices; private final MetricRegistryImpl metricRegistry; @@ -129,14 +128,14 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION); rpcService = createRpcService(configuration, highAvailabilityServices); - metricQueryServiceActorSystem = MetricUtils.startMetricsActorSystem(configuration, rpcService.getAddress(), LOG); HeartbeatServices heartbeatServices = HeartbeatServices.fromConfiguration(configuration); metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration)); // TODO: Temporary hack until the MetricQueryService has been ported to RpcEndpoint - metricRegistry.startQueryService(metricQueryServiceActorSystem, resourceId); + final ActorSystem actorSystem = ((AkkaRpcService) rpcService).getActorSystem(); + metricRegistry.startQueryService(actorSystem, resourceId); blobCacheService = new BlobCacheService( configuration, highAvailabilityServices.createBlobStore(), null @@ -156,7 +155,7 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync this.terminationFuture = new CompletableFuture<>(); this.shutdown = false; - MemoryLogger.startIfConfigured(LOG, configuration, metricQueryServiceActorSystem); + MemoryLogger.startIfConfigured(LOG, configuration, actorSystem); } // -------------------------------------------------------------------------------------------- @@ -211,10 +210,6 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync exception = ExceptionUtils.firstOrSuppressed(e, exception); } - if (metricQueryServiceActorSystem != null) { - terminationFutures.add(AkkaUtils.terminateActorSystem(metricQueryServiceActorSystem)); - } - try { highAvailabilityServices.close(); } catch (Exception e) { @@ -361,8 +356,6 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration); - String metricQueryServicePath = metricRegistry.getMetricQueryServicePath(); - return new TaskExecutor( rpcService, taskManagerConfiguration, @@ -370,7 +363,6 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync taskManagerServices, heartbeatServices, taskManagerMetricGroup, - metricQueryServicePath, blobCacheService, fatalErrorHandler); } @@ -407,14 +399,6 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT); - return bindWithPort(configuration, taskManagerHostname, portRangeDefinition); - } - - private static RpcService bindWithPort( - Configuration configuration, - String taskManagerHostname, - String portRangeDefinition) throws Exception{ - // parse port range definition and create port iterator Iterator<Integer> portsIterator; try { diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index 39b287e..6e161f7 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -50,12 +50,6 @@ object AkkaUtils { val INF_TIMEOUT: FiniteDuration = 21474835 seconds - val FLINK_ACTOR_SYSTEM_NAME = "flink" - - def getFlinkActorSystemName = { - FLINK_ACTOR_SYSTEM_NAME - } - /** * Creates a local actor system without remoting. * @@ -109,19 +103,9 @@ object AkkaUtils { * @return created actor system */ def createActorSystem(akkaConfig: Config): ActorSystem = { - createActorSystem(FLINK_ACTOR_SYSTEM_NAME, akkaConfig) - } - - /** - * Creates an actor system with the given akka config. - * - * @param akkaConfig configuration for the actor system - * @return created actor system - */ - def createActorSystem(actorSystemName: String, akkaConfig: Config): ActorSystem = { // Initialize slf4j as logger of Akka's Netty instead of java.util.logging (FLINK-1650) InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory) - ActorSystem.create(actorSystemName, akkaConfig) + ActorSystem.create("flink", akkaConfig) } /** @@ -135,23 +119,7 @@ object AkkaUtils { } /** - * Returns a remote Akka config for the given configuration values. - * - * @param configuration containing the user provided configuration values - * @param hostname to bind against. If null, then the loopback interface is used - * @param port to bind against - * @param executorMode containing the user specified mode of executor - * @return A remote Akka config - */ - def getAkkaConfig(configuration: Configuration, - hostname: String, - port: Int, - executorConfig: Config): Config = { - getAkkaConfig(configuration, Some((hostname, port)), executorConfig) - } - - /** - * Returns a remote Akka config for the given configuration values. + * Return a remote Akka config for the given configuration values. * * @param configuration containing the user provided configuration values * @param hostname to bind against. If null, then the loopback interface is used @@ -187,25 +155,7 @@ object AkkaUtils { @throws(classOf[UnknownHostException]) def getAkkaConfig(configuration: Configuration, externalAddress: Option[(String, Int)]): Config = { - getAkkaConfig(configuration, externalAddress, getForkJoinExecutorConfig(configuration)) - } - - /** - * Creates an akka config with the provided configuration values. If the listening address is - * specified, then the actor system will listen on the respective address. - * - * @param configuration instance containing the user provided configuration values - * @param externalAddress optional tuple of bindAddress and port to be reachable at. - * If None is given, then an Akka config for local actor system - * will be returned - * @param executorConfig config defining the used executor by the default dispatcher - * @return Akka config - */ - @throws(classOf[UnknownHostException]) - def getAkkaConfig(configuration: Configuration, - externalAddress: Option[(String, Int)], - executorConfig: Config): Config = { - val defaultConfig = getBasicAkkaConfig(configuration).withFallback(executorConfig) + val defaultConfig = getBasicAkkaConfig(configuration) externalAddress match { @@ -257,6 +207,24 @@ object AkkaUtils { val supervisorStrategy = classOf[StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy] .getCanonicalName + val forkJoinExecutorParallelismFactor = + configuration.getDouble(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR) + + val forkJoinExecutorParallelismMin = + configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MIN) + + val forkJoinExecutorParallelismMax = + configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MAX) + + val forkJoinExecutorConfig = + s""" + | fork-join-executor { + | parallelism-factor = $forkJoinExecutorParallelismFactor + | parallelism-min = $forkJoinExecutorParallelismMin + | parallelism-max = $forkJoinExecutorParallelismMax + | } + """.stripMargin + val config = s""" |akka { @@ -283,6 +251,8 @@ object AkkaUtils { | | default-dispatcher { | throughput = $akkaThroughput + | + | $forkJoinExecutorConfig | } | } |} @@ -291,53 +261,6 @@ object AkkaUtils { ConfigFactory.parseString(config) } - def getThreadPoolExecutorConfig: Config = { - val configString = s""" - |akka { - | actor { - | default-dispatcher { - | executor = "thread-pool-executor" - | thread-pool-executor { - | core-pool-size-min = 2 - | core-pool-size-factor = 2.0 - | core-pool-size-max = 4 - | } - | } - | } - |} - """. - stripMargin - - ConfigFactory.parseString(configString) - } - - def getForkJoinExecutorConfig(configuration: Configuration): Config = { - val forkJoinExecutorParallelismFactor = - configuration.getDouble(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR) - - val forkJoinExecutorParallelismMin = - configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MIN) - - val forkJoinExecutorParallelismMax = - configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MAX) - - val configString = s""" - |akka { - | actor { - | default-dispatcher { - | executor = "fork-join-executor" - | fork-join-executor { - | parallelism-factor = $forkJoinExecutorParallelismFactor - | parallelism-min = $forkJoinExecutorParallelismMin - | parallelism-max = $forkJoinExecutorParallelismMax - | } - | } - | } - |}""".stripMargin - - ConfigFactory.parseString(configString) - } - def testDispatcherConfig: Config = { val config = s""" diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java index 3ee1b92..6e0f9c5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java @@ -165,7 +165,6 @@ public class TaskExecutorITCase extends TestLogger { taskManagerServices, heartbeatServices, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), - null, new BlobCacheService( configuration, new VoidBlobStore(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index 179dea2..4af0529 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -279,7 +279,6 @@ public class TaskExecutorTest extends TestLogger { taskManagerServices, heartbeatServices, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), - null, dummyBlobCacheService, testingFatalErrorHandler); @@ -370,7 +369,6 @@ public class TaskExecutorTest extends TestLogger { taskManagerServices, heartbeatServices, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), - null, dummyBlobCacheService, testingFatalErrorHandler); @@ -486,7 +484,6 @@ public class TaskExecutorTest extends TestLogger { taskManagerServices, heartbeatServices, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), - null, dummyBlobCacheService, testingFatalErrorHandler); @@ -565,7 +562,6 @@ public class TaskExecutorTest extends TestLogger { taskManagerServices, new HeartbeatServices(1000L, 1000L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), - null, dummyBlobCacheService, testingFatalErrorHandler); @@ -629,7 +625,6 @@ public class TaskExecutorTest extends TestLogger { taskManagerServices, new HeartbeatServices(1000L, 1000L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), - null, dummyBlobCacheService, testingFatalErrorHandler); @@ -752,7 +747,6 @@ public class TaskExecutorTest extends TestLogger { taskManagerServices, new HeartbeatServices(1000L, 1000L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), - null, dummyBlobCacheService, testingFatalErrorHandler); @@ -852,7 +846,6 @@ public class TaskExecutorTest extends TestLogger { taskManagerServices, new HeartbeatServices(1000L, 1000L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), - null, dummyBlobCacheService, testingFatalErrorHandler); @@ -884,7 +877,7 @@ public class TaskExecutorTest extends TestLogger { // the job leader should get the allocation id offered verify(jobMasterGateway, Mockito.timeout(timeout.toMilliseconds())).offerSlots( any(ResourceID.class), - (Collection<SlotOffer>)Matchers.argThat(contains(slotOffer)), + (Collection<SlotOffer>) Matchers.argThat(contains(slotOffer)), any(Time.class)); } finally { RpcUtils.terminateRpcEndpoint(taskManager, timeout); @@ -967,7 +960,6 @@ public class TaskExecutorTest extends TestLogger { taskManagerServices, new HeartbeatServices(1000L, 1000L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), - null, dummyBlobCacheService, testingFatalErrorHandler); @@ -1062,7 +1054,6 @@ public class TaskExecutorTest extends TestLogger { taskManagerServices, new HeartbeatServices(1000L, 1000L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), - null, dummyBlobCacheService, testingFatalErrorHandler); @@ -1169,7 +1160,6 @@ public class TaskExecutorTest extends TestLogger { taskManagerServices, heartbeatServicesMock, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), - null, dummyBlobCacheService, testingFatalErrorHandler); @@ -1243,7 +1233,6 @@ public class TaskExecutorTest extends TestLogger { taskManagerServices, heartbeatServices, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), - null, dummyBlobCacheService, testingFatalErrorHandler); @@ -1300,7 +1289,6 @@ public class TaskExecutorTest extends TestLogger { taskManagerServices, new HeartbeatServices(1000L, 1000L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), - null, dummyBlobCacheService, testingFatalErrorHandler); @@ -1393,7 +1381,6 @@ public class TaskExecutorTest extends TestLogger { taskManagerServices, new HeartbeatServices(heartbeatInterval, 10L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), - null, dummyBlobCacheService, testingFatalErrorHandler); @@ -1501,7 +1488,6 @@ public class TaskExecutorTest extends TestLogger { .build(), new HeartbeatServices(heartbeatInterval, 1000L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), - null, dummyBlobCacheService, testingFatalErrorHandler); @@ -1713,7 +1699,6 @@ public class TaskExecutorTest extends TestLogger { taskManagerServices, new HeartbeatServices(1000L, 1000L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), - null, dummyBlobCacheService, testingFatalErrorHandler); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java index 5fd12a8..a9e9949 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java @@ -34,7 +34,6 @@ import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.StackTraceSampleResponse; import org.apache.flink.runtime.resourcemanager.ResourceManagerId; -import org.apache.flink.types.SerializableOptional; import org.apache.flink.util.Preconditions; import java.util.concurrent.CompletableFuture; @@ -151,11 +150,6 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway { } @Override - public CompletableFuture<SerializableOptional<String>> requestMetricQueryServiceAddress(Time timeout) { - return CompletableFuture.completedFuture(SerializableOptional.of(address)); - } - - @Override public String getAddress() { return address; } diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala index e5c1668..d02a554 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala @@ -18,7 +18,7 @@ package org.apache.flink.runtime.akka -import java.net.{InetAddress, InetSocketAddress} +import java.net.InetSocketAddress import org.apache.flink.configuration.{AkkaOptions, Configuration, IllegalConfigurationException} import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution @@ -167,30 +167,4 @@ class AkkaUtilsTest akkaConfig.getString("akka.remote.netty.tcp.hostname") should equal(NetUtils.unresolvedHostToNormalizedString(hostname)) } - - test("null hostname should go to localhost") { - val configure = AkkaUtils.getAkkaConfig(new Configuration(), Some((null, 1772))) - - val hostname = configure.getString("akka.remote.netty.tcp.hostname") - - InetAddress.getByName(hostname).isLoopbackAddress should be(true) - } - - test("getAkkaConfig defaults to fork-join-executor") { - val akkaConfig = AkkaUtils.getAkkaConfig(new Configuration()) - - akkaConfig.getString("akka.actor.default-dispatcher.executor") should - equal("fork-join-executor") - } - - test("getAkkaConfig respects executor config") { - val akkaConfig = AkkaUtils.getAkkaConfig( - new Configuration(), - "localhost", - 1234, - AkkaUtils.getThreadPoolExecutorConfig) - - akkaConfig.getString("akka.actor.default-dispatcher.executor") should - equal("thread-pool-executor") - } }
