This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.6 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 61330e47afe8960c804b8cd2edf2ba589b594d8b Author: Till Rohrmann <[email protected]> AuthorDate: Tue Oct 16 13:52:53 2018 +0200 Revert "[FLINK-10247] Run query service in separate ActorSystem" This reverts commit 9f593cd23d130b5a1266bd68ea5b34860c170c39. --- docs/_includes/generated/metric_configuration.html | 5 - .../apache/flink/configuration/MetricOptions.java | 12 -- .../apache/flink/types/SerializableOptional.java | 86 ------------- .../runtime/clusterframework/BootstrapTools.java | 143 +-------------------- .../runtime/entrypoint/ClusterEntrypoint.java | 17 +-- .../flink/runtime/metrics/util/MetricUtils.java | 18 --- .../flink/runtime/minicluster/MiniCluster.java | 53 +++----- .../runtime/resourcemanager/ResourceManager.java | 22 +--- .../runtime/rpc/akka/AkkaRpcServiceUtils.java | 5 +- .../flink/runtime/taskexecutor/TaskExecutor.java | 12 -- .../runtime/taskexecutor/TaskExecutorGateway.java | 8 -- .../runtime/taskexecutor/TaskManagerRunner.java | 24 +--- .../org/apache/flink/runtime/akka/AkkaUtils.scala | 123 ++++-------------- .../runtime/taskexecutor/TaskExecutorITCase.java | 1 - .../runtime/taskexecutor/TaskExecutorTest.java | 17 +-- .../taskexecutor/TestingTaskExecutorGateway.java | 6 - .../apache/flink/runtime/akka/AkkaUtilsTest.scala | 28 +--- 17 files changed, 66 insertions(+), 514 deletions(-) diff --git a/docs/_includes/generated/metric_configuration.html b/docs/_includes/generated/metric_configuration.html index 5f52abd..0fe9d0c 100644 --- a/docs/_includes/generated/metric_configuration.html +++ b/docs/_includes/generated/metric_configuration.html @@ -8,11 +8,6 @@ </thead> <tbody> <tr> - <td><h5>metrics.internal.query-service.port</h5></td> - <td style="word-wrap: break-word;">"0"</td> - <td>The port range used for Flink's internal metric query service. Accepts a list of ports (“50100,50101”), ranges(“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple Flink components are running on the same machine. Per default Flink will pick a random port.</td> - </tr> - <tr> <td><h5>metrics.latency.granularity</h5></td> <td style="word-wrap: break-word;">"subtask"</td> <td>Defines the granularity of latency metrics. Accepted values are:<ul><li>single - Track latency without differentiating between sources and subtasks.</li><li>operator - Track latency while differentiating between sources, but not subtasks.</li><li>subtask - Track latency while differentiating between sources and subtasks.</li></ul></td> diff --git a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java index e76b7f2..fc6b3c1 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java @@ -130,18 +130,6 @@ public class MetricOptions { .defaultValue(128) .withDescription("Defines the number of measured latencies to maintain at each operator."); - /** - * The default network port range for Flink's internal metric query service. The {@code "0"} means that - * Flink searches for a free port. - */ - public static final ConfigOption<String> QUERY_SERVICE_PORT = - key("metrics.internal.query-service.port") - .defaultValue("0") - .withDescription("The port range used for Flink's internal metric query service. Accepts a list of ports " + - "(“50100,50101”), ranges(“50100-50200”) or a combination of both. It is recommended to set a range of " + - "ports to avoid collisions when multiple Flink components are running on the same machine. Per default " + - "Flink will pick a random port."); - private MetricOptions() { } } diff --git a/flink-core/src/main/java/org/apache/flink/types/SerializableOptional.java b/flink-core/src/main/java/org/apache/flink/types/SerializableOptional.java deleted file mode 100644 index 89dcea4..0000000 --- a/flink-core/src/main/java/org/apache/flink/types/SerializableOptional.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.types; - -import javax.annotation.Nonnull; -import javax.annotation.Nullable; - -import java.io.Serializable; -import java.util.NoSuchElementException; -import java.util.Optional; -import java.util.function.Consumer; -import java.util.function.Function; - -/** - * Serializable {@link Optional}. - */ -public final class SerializableOptional<T extends Serializable> implements Serializable { - private static final long serialVersionUID = -3312769593551775940L; - - private static final SerializableOptional<?> EMPTY = new SerializableOptional<>(null); - - @Nullable - private final T value; - - private SerializableOptional(@Nullable T value) { - this.value = value; - } - - public T get() { - if (value == null) { - throw new NoSuchElementException("No value present"); - } - return value; - } - - public boolean isPresent() { - return value != null; - } - - public void ifPresent(Consumer<? super T> consumer) { - if (value != null) { - consumer.accept(value); - } - } - - public <R> Optional<R> map(Function<? super T, ? extends R> mapper) { - if (value == null) { - return Optional.empty(); - } else { - return Optional.ofNullable(mapper.apply(value)); - } - } - - public static <T extends Serializable> SerializableOptional<T> of(@Nonnull T value) { - return new SerializableOptional<>(value); - } - - public static <T extends Serializable> SerializableOptional<T> ofNullable(@Nullable T value) { - if (value == null) { - return empty(); - } else { - return of(value); - } - } - - @SuppressWarnings("unchecked") - public static <T extends Serializable> SerializableOptional<T> empty() { - return (SerializableOptional<T>) EMPTY; - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java index 00b6173..56e4576 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java @@ -46,7 +46,6 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.File; @@ -86,66 +85,13 @@ public class BootstrapTools { * @param portRangeDefinition The port range to choose a port from. * @param logger The logger to output log information. * @return The ActorSystem which has been started - * @throws Exception Thrown when actor system cannot be started in specified port range - */ - public static ActorSystem startActorSystem( - Configuration configuration, - String listeningAddress, - String portRangeDefinition, - Logger logger) throws Exception { - return startActorSystem( - configuration, - listeningAddress, - portRangeDefinition, - logger, - ActorSystemExecutorMode.FORK_JOIN_EXECUTOR); - } - - /** - * Starts an ActorSystem with the given configuration listening at the address/ports. - * - * @param configuration The Flink configuration - * @param listeningAddress The address to listen at. - * @param portRangeDefinition The port range to choose a port from. - * @param logger The logger to output log information. - * @param executorMode The executor mode of Akka actor system. - * @return The ActorSystem which has been started - * @throws Exception Thrown when actor system cannot be started in specified port range - */ - public static ActorSystem startActorSystem( - Configuration configuration, - String listeningAddress, - String portRangeDefinition, - Logger logger, - @Nonnull ActorSystemExecutorMode executorMode) throws Exception { - return startActorSystem( - configuration, - AkkaUtils.getFlinkActorSystemName(), - listeningAddress, - portRangeDefinition, - logger, - executorMode); - } - - /** - * Starts an ActorSystem with the given configuration listening at the address/ports. - * - * @param configuration The Flink configuration - * @param actorSystemName Name of the started {@link ActorSystem} - * @param listeningAddress The address to listen at. - * @param portRangeDefinition The port range to choose a port from. - * @param logger The logger to output log information. - * @param executorMode The executor mode of Akka actor system. - * @return The ActorSystem which has been started - * @throws Exception Thrown when actor system cannot be started in specified port range + * @throws Exception */ public static ActorSystem startActorSystem( Configuration configuration, - String actorSystemName, String listeningAddress, String portRangeDefinition, - Logger logger, - @Nonnull ActorSystemExecutorMode executorMode) throws Exception { + Logger logger) throws Exception { // parse port range definition and create port iterator Iterator<Integer> portsIterator; @@ -171,13 +117,7 @@ public class BootstrapTools { } try { - return startActorSystem( - configuration, - actorSystemName, - listeningAddress, - port, - logger, - executorMode); + return startActorSystem(configuration, listeningAddress, port, logger); } catch (Exception e) { // we can continue to try if this contains a netty channel exception @@ -196,7 +136,6 @@ public class BootstrapTools { /** * Starts an Actor System at a specific port. - * * @param configuration The Flink configuration. * @param listeningAddress The address to listen at. * @param listeningPort The port to listen at. @@ -205,56 +144,10 @@ public class BootstrapTools { * @throws Exception */ public static ActorSystem startActorSystem( - Configuration configuration, - String listeningAddress, - int listeningPort, - Logger logger) throws Exception { - return startActorSystem(configuration, listeningAddress, listeningPort, logger, ActorSystemExecutorMode.FORK_JOIN_EXECUTOR); - } - - /** - * Starts an Actor System at a specific port. - * @param configuration The Flink configuration. - * @param listeningAddress The address to listen at. - * @param listeningPort The port to listen at. - * @param logger the logger to output log information. - * @param executorMode The executor mode of Akka actor system. - * @return The ActorSystem which has been started. - * @throws Exception - */ - public static ActorSystem startActorSystem( Configuration configuration, String listeningAddress, int listeningPort, - Logger logger, - ActorSystemExecutorMode executorMode) throws Exception { - return startActorSystem( - configuration, - AkkaUtils.getFlinkActorSystemName(), - listeningAddress, - listeningPort, - logger, - executorMode); - } - - /** - * Starts an Actor System at a specific port. - * @param configuration The Flink configuration. - * @param actorSystemName Name of the started {@link ActorSystem} - * @param listeningAddress The address to listen at. - * @param listeningPort The port to listen at. - * @param logger the logger to output log information. - * @param executorMode The executor mode of Akka actor system. - * @return The ActorSystem which has been started. - * @throws Exception - */ - public static ActorSystem startActorSystem( - Configuration configuration, - String actorSystemName, - String listeningAddress, - int listeningPort, - Logger logger, - ActorSystemExecutorMode executorMode) throws Exception { + Logger logger) throws Exception { String hostPortUrl = NetUtils.unresolvedHostAndPortToNormalizedString(listeningAddress, listeningPort); logger.info("Trying to start actor system at {}", hostPortUrl); @@ -262,13 +155,12 @@ public class BootstrapTools { try { Config akkaConfig = AkkaUtils.getAkkaConfig( configuration, - new Some<>(new Tuple2<>(listeningAddress, listeningPort)), - getExecutorConfigByExecutorMode(configuration, executorMode) + new Some<>(new Tuple2<>(listeningAddress, listeningPort)) ); logger.debug("Using akka configuration\n {}", akkaConfig); - ActorSystem actorSystem = AkkaUtils.createActorSystem(actorSystemName, akkaConfig); + ActorSystem actorSystem = AkkaUtils.createActorSystem(akkaConfig); logger.info("Actor system started at {}", AkkaUtils.getAddress(actorSystem)); return actorSystem; @@ -278,24 +170,13 @@ public class BootstrapTools { Throwable cause = t.getCause(); if (cause != null && t.getCause() instanceof BindException) { throw new IOException("Unable to create ActorSystem at address " + hostPortUrl + - " : " + cause.getMessage(), t); + " : " + cause.getMessage(), t); } } throw new Exception("Could not create actor system", t); } } - private static Config getExecutorConfigByExecutorMode(Configuration configuration, ActorSystemExecutorMode executorMode) { - switch (executorMode) { - case FORK_JOIN_EXECUTOR: - return AkkaUtils.getForkJoinExecutorConfig(configuration); - case FIXED_THREAD_POOL_EXECUTOR: - return AkkaUtils.getThreadPoolExecutorConfig(); - default: - throw new IllegalArgumentException(String.format("Unknown ActorSystemExecutorMode %s.", executorMode)); - } - } - /** * Starts the web frontend. * @@ -623,14 +504,4 @@ public class BootstrapTools { return clonedConfiguration; } - - /** - * Options to specify which executor to use in an {@link ActorSystem}. - */ - public enum ActorSystemExecutorMode { - /** Used by default, use dispatcher with fork-join-executor. **/ - FORK_JOIN_EXECUTOR, - /** Use dispatcher with fixed thread pool executor. **/ - FIXED_THREAD_POOL_EXECUTOR - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index 2a9baa9..fd0a0a1 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -96,8 +96,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import scala.concurrent.duration.FiniteDuration; -import static org.apache.flink.runtime.clusterframework.BootstrapTools.ActorSystemExecutorMode.FORK_JOIN_EXECUTOR; - /** * Base class for the Flink cluster entry points. * @@ -157,9 +155,6 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler { private WebMonitorEndpoint<?> webMonitorEndpoint; @GuardedBy("lock") - private ActorSystem metricQueryServiceActorSystem; - - @GuardedBy("lock") private ArchivedExecutionGraphStore archivedExecutionGraphStore; @GuardedBy("lock") @@ -281,9 +276,9 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler { metricRegistry = createMetricRegistry(configuration); // TODO: This is a temporary hack until we have ported the MetricQueryService to the new RpcEndpoint - // Start actor system for metric query service on any available port - metricQueryServiceActorSystem = MetricUtils.startMetricsActorSystem(configuration, bindAddress, LOG); - metricRegistry.startQueryService(metricQueryServiceActorSystem, null); + // start the MetricQueryService + final ActorSystem actorSystem = ((AkkaRpcService) commonRpcService).getActorSystem(); + metricRegistry.startQueryService(actorSystem, null); archivedExecutionGraphStore = createSerializableExecutionGraphStore(configuration, commonRpcService.getScheduledExecutor()); @@ -401,7 +396,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler { Configuration configuration, String bindAddress, String portRange) throws Exception { - ActorSystem actorSystem = BootstrapTools.startActorSystem(configuration, bindAddress, portRange, LOG, FORK_JOIN_EXECUTOR); + ActorSystem actorSystem = BootstrapTools.startActorSystem(configuration, bindAddress, portRange, LOG); FiniteDuration duration = AkkaUtils.getTimeout(configuration); return new AkkaRpcService(actorSystem, Time.of(duration.length(), duration.unit())); } @@ -469,10 +464,6 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler { terminationFutures.add(metricRegistry.shutdown()); } - if (metricQueryServiceActorSystem != null) { - terminationFutures.add(AkkaUtils.terminateActorSystem(metricQueryServiceActorSystem)); - } - if (commonRpcService != null) { terminationFutures.add(commonRpcService.stopService()); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java index 39e5f44..3fd268a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java @@ -18,11 +18,8 @@ package org.apache.flink.runtime.metrics.util; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.MetricOptions; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; @@ -30,7 +27,6 @@ import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.Preconditions; -import akka.actor.ActorSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,15 +45,12 @@ import java.lang.management.MemoryMXBean; import java.lang.management.ThreadMXBean; import java.util.List; -import static org.apache.flink.runtime.clusterframework.BootstrapTools.ActorSystemExecutorMode.FIXED_THREAD_POOL_EXECUTOR; - /** * Utility class to register pre-defined metric sets. */ public class MetricUtils { private static final Logger LOG = LoggerFactory.getLogger(MetricUtils.class); private static final String METRIC_GROUP_STATUS_NAME = "Status"; - private static final String METRICS_ACTOR_SYSTEM_NAME = "flink-metrics"; private MetricUtils() { } @@ -109,17 +102,6 @@ public class MetricUtils { instantiateCPUMetrics(jvm.addGroup("CPU")); } - public static ActorSystem startMetricsActorSystem(Configuration configuration, String hostname, Logger logger) throws Exception { - final String portRange = configuration.getString(MetricOptions.QUERY_SERVICE_PORT); - return BootstrapTools.startActorSystem( - configuration, - METRICS_ACTOR_SYSTEM_NAME, - hostname, - portRange, - logger, - FIXED_THREAD_POOL_EXECUTOR); - } - private static void instantiateNetworkMetrics( MetricGroup metrics, final NetworkEnvironment network) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index bcd4c7b..4bfdb25 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -134,9 +134,6 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { private RpcService resourceManagerRpcService; @GuardedBy("lock") - private ActorSystem metricQueryServiceActorSystem; - - @GuardedBy("lock") private HighAvailabilityServices haServices; @GuardedBy("lock") @@ -255,11 +252,8 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { commonRpcService = createRpcService(configuration, rpcTimeout, false, null); // TODO: Temporary hack until the metric query service is ported to the RpcEndpoint - metricQueryServiceActorSystem = MetricUtils.startMetricsActorSystem( - configuration, - commonRpcService.getAddress(), - LOG); - metricRegistry.startQueryService(metricQueryServiceActorSystem, null); + final ActorSystem actorSystem = ((AkkaRpcService) commonRpcService).getActorSystem(); + metricRegistry.startQueryService(actorSystem, null); if (useSingleRpcService) { for (int i = 0; i < numTaskManagers; i++) { @@ -356,7 +350,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1), "DispatcherRestEndpoint"), new AkkaQueryServiceRetriever( - metricQueryServiceActorSystem, + actorSystem, Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))), haServices.getWebMonitorLeaderElectionService(), new ShutDownFatalErrorHandler()); @@ -453,12 +447,24 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { final FutureUtils.ConjunctFuture<Void> componentsTerminationFuture = FutureUtils.completeAll(componentTerminationFutures); - final CompletableFuture<Void> metricSystemTerminationFuture = FutureUtils.composeAfterwards( + final CompletableFuture<Void> metricRegistryTerminationFuture = FutureUtils.runAfterwards( componentsTerminationFuture, - this::closeMetricSystem); + () -> { + synchronized (lock) { + if (jobManagerMetricGroup != null) { + jobManagerMetricGroup.close(); + jobManagerMetricGroup = null; + } + // metrics shutdown + if (metricRegistry != null) { + metricRegistry.shutdown(); + metricRegistry = null; + } + } + }); // shut down the RpcServices - final CompletableFuture<Void> rpcServicesTerminationFuture = metricSystemTerminationFuture + final CompletableFuture<Void> rpcServicesTerminationFuture = metricRegistryTerminationFuture .thenCompose((Void ignored) -> terminateRpcServices()); final CompletableFuture<Void> remainingServicesTerminationFuture = FutureUtils.runAfterwards( @@ -482,29 +488,6 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { } } - private CompletableFuture<Void> closeMetricSystem() { - synchronized (lock) { - if (jobManagerMetricGroup != null) { - jobManagerMetricGroup.close(); - jobManagerMetricGroup = null; - } - - final ArrayList<CompletableFuture<Void>> terminationFutures = new ArrayList<>(2); - - // metrics shutdown - if (metricRegistry != null) { - terminationFutures.add(metricRegistry.shutdown()); - metricRegistry = null; - } - - if (metricQueryServiceActorSystem != null) { - terminationFutures.add(AkkaUtils.terminateActorSystem(metricQueryServiceActorSystem)); - } - - return FutureUtils.completeAll(terminationFutures); - } - } - // ------------------------------------------------------------------------ // Accessing jobs // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index a3a075d..d78e346 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -48,6 +48,7 @@ import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.dump.MetricQueryService; import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; @@ -75,13 +76,11 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeoutException; -import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -594,26 +593,19 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> @Override public CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(Time timeout) { - final ArrayList<CompletableFuture<Optional<Tuple2<ResourceID, String>>>> metricQueryServicePathFutures = new ArrayList<>(taskExecutors.size()); + final ArrayList<Tuple2<ResourceID, String>> metricQueryServicePaths = new ArrayList<>(taskExecutors.size()); for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> workerRegistrationEntry : taskExecutors.entrySet()) { final ResourceID tmResourceId = workerRegistrationEntry.getKey(); final WorkerRegistration<WorkerType> workerRegistration = workerRegistrationEntry.getValue(); - final TaskExecutorGateway taskExecutorGateway = workerRegistration.getTaskExecutorGateway(); + final String taskManagerAddress = workerRegistration.getTaskExecutorGateway().getAddress(); + final String tmMetricQueryServicePath = taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) + + MetricQueryService.METRIC_QUERY_SERVICE_NAME + '_' + tmResourceId.getResourceIdString(); - final CompletableFuture<Optional<Tuple2<ResourceID, String>>> metricQueryServicePathFuture = taskExecutorGateway - .requestMetricQueryServiceAddress(timeout) - .thenApply(optional -> optional.map(path -> Tuple2.of(tmResourceId, path))); - - metricQueryServicePathFutures.add(metricQueryServicePathFuture); + metricQueryServicePaths.add(Tuple2.of(tmResourceId, tmMetricQueryServicePath)); } - return FutureUtils.combineAll(metricQueryServicePathFutures).thenApply( - collection -> collection - .stream() - .filter(Optional::isPresent) - .map(Optional::get) - .collect(Collectors.toList())); + return CompletableFuture.completedFuture(metricQueryServicePaths); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java index 3ee7641..3a62698 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java @@ -70,10 +70,7 @@ public class AkkaRpcServiceUtils { * @throws IOException Thrown, if the actor system can not bind to the address * @throws Exception Thrown is some other error occurs while creating akka actor system */ - public static RpcService createRpcService( - String hostname, - int port, - Configuration configuration) throws Exception { + public static RpcService createRpcService(String hostname, int port, Configuration configuration) throws Exception { LOG.info("Starting AkkaRpcService at {}.", NetUtils.unresolvedHostAndPortToNormalizedString(hostname, port)); final ActorSystem actorSystem; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index f90e939..33db7e1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -102,7 +102,6 @@ import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerActions; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import org.apache.flink.types.SerializableOptional; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; @@ -158,10 +157,6 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { private final BlobCacheService blobCacheService; - /** The path to metric query service on this Task Manager. */ - @Nullable - private final String metricQueryServicePath; - // --------- TaskManager services -------- /** The connection information of this task manager. */ @@ -216,7 +211,6 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { TaskManagerServices taskExecutorServices, HeartbeatServices heartbeatServices, TaskManagerMetricGroup taskManagerMetricGroup, - @Nullable String metricQueryServicePath, BlobCacheService blobCacheService, FatalErrorHandler fatalErrorHandler) { @@ -230,7 +224,6 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { this.fatalErrorHandler = checkNotNull(fatalErrorHandler); this.taskManagerMetricGroup = checkNotNull(taskManagerMetricGroup); this.blobCacheService = checkNotNull(blobCacheService); - this.metricQueryServicePath = metricQueryServicePath; this.taskSlotTable = taskExecutorServices.getTaskSlotTable(); this.jobManagerTable = taskExecutorServices.getJobManagerTable(); @@ -854,11 +847,6 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { } } - @Override - public CompletableFuture<SerializableOptional<String>> requestMetricQueryServiceAddress(Time timeout) { - return CompletableFuture.completedFuture(SerializableOptional.ofNullable(metricQueryServicePath)); - } - // ---------------------------------------------------------------------- // Disconnection RPCs // ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java index d6b9e15..4f79289 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java @@ -36,7 +36,6 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.taskmanager.Task; -import org.apache.flink.types.SerializableOptional; import java.util.concurrent.CompletableFuture; @@ -196,11 +195,4 @@ public interface TaskExecutorGateway extends RpcGateway { * @return Future which is completed with the {@link TransientBlobKey} of the uploaded file. */ CompletableFuture<TransientBlobKey> requestFileUpload(FileType fileType, @RpcTimeout Time timeout); - - /** - * Returns the fully qualified address of Metric Query Service on the TaskManager. - * - * @return Future String with Fully qualified (RPC) address of Metric Query Service on the TaskManager. - */ - CompletableFuture<SerializableOptional<String>> requestMetricQueryServiceAddress(@RpcTimeout Time timeout); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index f830ae1..42fe5bf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -43,6 +43,7 @@ import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.metrics.util.MetricUtils; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils; import org.apache.flink.runtime.security.SecurityConfiguration; import org.apache.flink.runtime.security.SecurityUtils; @@ -100,8 +101,6 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync private final RpcService rpcService; - private final ActorSystem metricQueryServiceActorSystem; - private final HighAvailabilityServices highAvailabilityServices; private final MetricRegistryImpl metricRegistry; @@ -133,14 +132,14 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION); rpcService = createRpcService(configuration, highAvailabilityServices); - metricQueryServiceActorSystem = MetricUtils.startMetricsActorSystem(configuration, rpcService.getAddress(), LOG); HeartbeatServices heartbeatServices = HeartbeatServices.fromConfiguration(configuration); metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration)); // TODO: Temporary hack until the MetricQueryService has been ported to RpcEndpoint - metricRegistry.startQueryService(metricQueryServiceActorSystem, resourceId); + final ActorSystem actorSystem = ((AkkaRpcService) rpcService).getActorSystem(); + metricRegistry.startQueryService(actorSystem, resourceId); blobCacheService = new BlobCacheService( configuration, highAvailabilityServices.createBlobStore(), null @@ -160,7 +159,7 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync this.terminationFuture = new CompletableFuture<>(); this.shutdown = false; - MemoryLogger.startIfConfigured(LOG, configuration, metricQueryServiceActorSystem); + MemoryLogger.startIfConfigured(LOG, configuration, actorSystem); } // -------------------------------------------------------------------------------------------- @@ -215,10 +214,6 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync exception = ExceptionUtils.firstOrSuppressed(e, exception); } - if (metricQueryServiceActorSystem != null) { - terminationFutures.add(AkkaUtils.terminateActorSystem(metricQueryServiceActorSystem)); - } - try { highAvailabilityServices.close(); } catch (Exception e) { @@ -378,8 +373,6 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration); - String metricQueryServicePath = metricRegistry.getMetricQueryServicePath(); - return new TaskExecutor( rpcService, taskManagerConfiguration, @@ -387,7 +380,6 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync taskManagerServices, heartbeatServices, taskManagerMetricGroup, - metricQueryServicePath, blobCacheService, fatalErrorHandler); } @@ -424,14 +416,6 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT); - return bindWithPort(configuration, taskManagerHostname, portRangeDefinition); - } - - private static RpcService bindWithPort( - Configuration configuration, - String taskManagerHostname, - String portRangeDefinition) throws Exception{ - // parse port range definition and create port iterator Iterator<Integer> portsIterator; try { diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index 2b0c939..dcf0fdd 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -50,12 +50,6 @@ object AkkaUtils { val INF_TIMEOUT: FiniteDuration = 21474835 seconds - val FLINK_ACTOR_SYSTEM_NAME = "flink" - - def getFlinkActorSystemName = { - FLINK_ACTOR_SYSTEM_NAME - } - /** * Creates a local actor system without remoting. * @@ -109,19 +103,9 @@ object AkkaUtils { * @return created actor system */ def createActorSystem(akkaConfig: Config): ActorSystem = { - createActorSystem(FLINK_ACTOR_SYSTEM_NAME, akkaConfig) - } - - /** - * Creates an actor system with the given akka config. - * - * @param akkaConfig configuration for the actor system - * @return created actor system - */ - def createActorSystem(actorSystemName: String, akkaConfig: Config): ActorSystem = { // Initialize slf4j as logger of Akka's Netty instead of java.util.logging (FLINK-1650) InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory) - ActorSystem.create(actorSystemName, akkaConfig) + ActorSystem.create("flink", akkaConfig) } /** @@ -135,23 +119,7 @@ object AkkaUtils { } /** - * Returns a remote Akka config for the given configuration values. - * - * @param configuration containing the user provided configuration values - * @param hostname to bind against. If null, then the loopback interface is used - * @param port to bind against - * @param executorMode containing the user specified mode of executor - * @return A remote Akka config - */ - def getAkkaConfig(configuration: Configuration, - hostname: String, - port: Int, - executorConfig: Config): Config = { - getAkkaConfig(configuration, Some((hostname, port)), executorConfig) - } - - /** - * Returns a remote Akka config for the given configuration values. + * Return a remote Akka config for the given configuration values. * * @param configuration containing the user provided configuration values * @param hostname to bind against. If null, then the loopback interface is used @@ -187,25 +155,7 @@ object AkkaUtils { @throws(classOf[UnknownHostException]) def getAkkaConfig(configuration: Configuration, externalAddress: Option[(String, Int)]): Config = { - getAkkaConfig(configuration, externalAddress, getForkJoinExecutorConfig(configuration)) - } - - /** - * Creates an akka config with the provided configuration values. If the listening address is - * specified, then the actor system will listen on the respective address. - * - * @param configuration instance containing the user provided configuration values - * @param externalAddress optional tuple of bindAddress and port to be reachable at. - * If None is given, then an Akka config for local actor system - * will be returned - * @param executorConfig config defining the used executor by the default dispatcher - * @return Akka config - */ - @throws(classOf[UnknownHostException]) - def getAkkaConfig(configuration: Configuration, - externalAddress: Option[(String, Int)], - executorConfig: Config): Config = { - val defaultConfig = getBasicAkkaConfig(configuration).withFallback(executorConfig) + val defaultConfig = getBasicAkkaConfig(configuration) externalAddress match { @@ -257,6 +207,24 @@ object AkkaUtils { val supervisorStrategy = classOf[StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy] .getCanonicalName + val forkJoinExecutorParallelismFactor = + configuration.getDouble(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR) + + val forkJoinExecutorParallelismMin = + configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MIN) + + val forkJoinExecutorParallelismMax = + configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MAX) + + val forkJoinExecutorConfig = + s""" + | fork-join-executor { + | parallelism-factor = $forkJoinExecutorParallelismFactor + | parallelism-min = $forkJoinExecutorParallelismMin + | parallelism-max = $forkJoinExecutorParallelismMax + | } + """.stripMargin + val config = s""" |akka { @@ -283,6 +251,8 @@ object AkkaUtils { | | default-dispatcher { | throughput = $akkaThroughput + | + | $forkJoinExecutorConfig | } | } |} @@ -291,53 +261,6 @@ object AkkaUtils { ConfigFactory.parseString(config) } - def getThreadPoolExecutorConfig: Config = { - val configString = s""" - |akka { - | actor { - | default-dispatcher { - | executor = "thread-pool-executor" - | thread-pool-executor { - | core-pool-size-min = 2 - | core-pool-size-factor = 2.0 - | core-pool-size-max = 4 - | } - | } - | } - |} - """. - stripMargin - - ConfigFactory.parseString(configString) - } - - def getForkJoinExecutorConfig(configuration: Configuration): Config = { - val forkJoinExecutorParallelismFactor = - configuration.getDouble(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR) - - val forkJoinExecutorParallelismMin = - configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MIN) - - val forkJoinExecutorParallelismMax = - configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MAX) - - val configString = s""" - |akka { - | actor { - | default-dispatcher { - | executor = "fork-join-executor" - | fork-join-executor { - | parallelism-factor = $forkJoinExecutorParallelismFactor - | parallelism-min = $forkJoinExecutorParallelismMin - | parallelism-max = $forkJoinExecutorParallelismMax - | } - | } - | } - |}""".stripMargin - - ConfigFactory.parseString(configString) - } - def testDispatcherConfig: Config = { val config = s""" diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java index 3ee1b92..6e0f9c5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java @@ -165,7 +165,6 @@ public class TaskExecutorITCase extends TestLogger { taskManagerServices, heartbeatServices, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), - null, new BlobCacheService( configuration, new VoidBlobStore(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index 179dea2..4af0529 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -279,7 +279,6 @@ public class TaskExecutorTest extends TestLogger { taskManagerServices, heartbeatServices, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), - null, dummyBlobCacheService, testingFatalErrorHandler); @@ -370,7 +369,6 @@ public class TaskExecutorTest extends TestLogger { taskManagerServices, heartbeatServices, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), - null, dummyBlobCacheService, testingFatalErrorHandler); @@ -486,7 +484,6 @@ public class TaskExecutorTest extends TestLogger { taskManagerServices, heartbeatServices, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), - null, dummyBlobCacheService, testingFatalErrorHandler); @@ -565,7 +562,6 @@ public class TaskExecutorTest extends TestLogger { taskManagerServices, new HeartbeatServices(1000L, 1000L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), - null, dummyBlobCacheService, testingFatalErrorHandler); @@ -629,7 +625,6 @@ public class TaskExecutorTest extends TestLogger { taskManagerServices, new HeartbeatServices(1000L, 1000L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), - null, dummyBlobCacheService, testingFatalErrorHandler); @@ -752,7 +747,6 @@ public class TaskExecutorTest extends TestLogger { taskManagerServices, new HeartbeatServices(1000L, 1000L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), - null, dummyBlobCacheService, testingFatalErrorHandler); @@ -852,7 +846,6 @@ public class TaskExecutorTest extends TestLogger { taskManagerServices, new HeartbeatServices(1000L, 1000L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), - null, dummyBlobCacheService, testingFatalErrorHandler); @@ -884,7 +877,7 @@ public class TaskExecutorTest extends TestLogger { // the job leader should get the allocation id offered verify(jobMasterGateway, Mockito.timeout(timeout.toMilliseconds())).offerSlots( any(ResourceID.class), - (Collection<SlotOffer>)Matchers.argThat(contains(slotOffer)), + (Collection<SlotOffer>) Matchers.argThat(contains(slotOffer)), any(Time.class)); } finally { RpcUtils.terminateRpcEndpoint(taskManager, timeout); @@ -967,7 +960,6 @@ public class TaskExecutorTest extends TestLogger { taskManagerServices, new HeartbeatServices(1000L, 1000L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), - null, dummyBlobCacheService, testingFatalErrorHandler); @@ -1062,7 +1054,6 @@ public class TaskExecutorTest extends TestLogger { taskManagerServices, new HeartbeatServices(1000L, 1000L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), - null, dummyBlobCacheService, testingFatalErrorHandler); @@ -1169,7 +1160,6 @@ public class TaskExecutorTest extends TestLogger { taskManagerServices, heartbeatServicesMock, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), - null, dummyBlobCacheService, testingFatalErrorHandler); @@ -1243,7 +1233,6 @@ public class TaskExecutorTest extends TestLogger { taskManagerServices, heartbeatServices, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), - null, dummyBlobCacheService, testingFatalErrorHandler); @@ -1300,7 +1289,6 @@ public class TaskExecutorTest extends TestLogger { taskManagerServices, new HeartbeatServices(1000L, 1000L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), - null, dummyBlobCacheService, testingFatalErrorHandler); @@ -1393,7 +1381,6 @@ public class TaskExecutorTest extends TestLogger { taskManagerServices, new HeartbeatServices(heartbeatInterval, 10L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), - null, dummyBlobCacheService, testingFatalErrorHandler); @@ -1501,7 +1488,6 @@ public class TaskExecutorTest extends TestLogger { .build(), new HeartbeatServices(heartbeatInterval, 1000L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), - null, dummyBlobCacheService, testingFatalErrorHandler); @@ -1713,7 +1699,6 @@ public class TaskExecutorTest extends TestLogger { taskManagerServices, new HeartbeatServices(1000L, 1000L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), - null, dummyBlobCacheService, testingFatalErrorHandler); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java index 5fd12a8..a9e9949 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java @@ -34,7 +34,6 @@ import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.StackTraceSampleResponse; import org.apache.flink.runtime.resourcemanager.ResourceManagerId; -import org.apache.flink.types.SerializableOptional; import org.apache.flink.util.Preconditions; import java.util.concurrent.CompletableFuture; @@ -151,11 +150,6 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway { } @Override - public CompletableFuture<SerializableOptional<String>> requestMetricQueryServiceAddress(Time timeout) { - return CompletableFuture.completedFuture(SerializableOptional.of(address)); - } - - @Override public String getAddress() { return address; } diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala index e5c1668..d02a554 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala @@ -18,7 +18,7 @@ package org.apache.flink.runtime.akka -import java.net.{InetAddress, InetSocketAddress} +import java.net.InetSocketAddress import org.apache.flink.configuration.{AkkaOptions, Configuration, IllegalConfigurationException} import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution @@ -167,30 +167,4 @@ class AkkaUtilsTest akkaConfig.getString("akka.remote.netty.tcp.hostname") should equal(NetUtils.unresolvedHostToNormalizedString(hostname)) } - - test("null hostname should go to localhost") { - val configure = AkkaUtils.getAkkaConfig(new Configuration(), Some((null, 1772))) - - val hostname = configure.getString("akka.remote.netty.tcp.hostname") - - InetAddress.getByName(hostname).isLoopbackAddress should be(true) - } - - test("getAkkaConfig defaults to fork-join-executor") { - val akkaConfig = AkkaUtils.getAkkaConfig(new Configuration()) - - akkaConfig.getString("akka.actor.default-dispatcher.executor") should - equal("fork-join-executor") - } - - test("getAkkaConfig respects executor config") { - val akkaConfig = AkkaUtils.getAkkaConfig( - new Configuration(), - "localhost", - 1234, - AkkaUtils.getThreadPoolExecutorConfig) - - akkaConfig.getString("akka.actor.default-dispatcher.executor") should - equal("thread-pool-executor") - } }
