[FLINK-4505] [cluster mngt] Implement TaskManager component's startup The TaskManagerRunner now contains the startup logic for the TaskManager's components.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c34f13c2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c34f13c2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c34f13c2 Branch: refs/heads/flip-6 Commit: c34f13c26bf215fbde6c3f28547f1a31ca6cbc99 Parents: b8c6b99 Author: æ·æ± <taojiang....@alibaba-inc.com> Authored: Fri Sep 2 18:00:49 2016 +0800 Committer: Stephan Ewen <se...@apache.org> Committed: Fri Oct 14 15:14:41 2016 +0200 ---------------------------------------------------------------------- .../runtime/taskexecutor/TaskExecutor.java | 766 +------------------ .../runtime/taskmanager/TaskManagerRunner.java | 749 ++++++++++++++++++ .../runtime/taskexecutor/TaskExecutorTest.java | 53 +- 3 files changed, 804 insertions(+), 764 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c34f13c2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 9d9ad2a..8ce2780 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -18,74 +18,29 @@ package org.apache.flink.runtime.taskexecutor; -import akka.actor.ActorSystem; -import com.typesafe.config.Config; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.io.network.ConnectionManager; -import org.apache.flink.runtime.io.network.LocalConnectionManager; -import org.apache.flink.runtime.io.network.TaskEventDispatcher; -import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; -import org.apache.flink.runtime.io.network.netty.NettyConnectionManager; -import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; -import org.apache.flink.runtime.query.KvStateRegistry; -import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats; -import org.apache.flink.runtime.query.netty.KvStateServer; import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered; import org.apache.flink.runtime.resourcemanager.SlotRequestReply; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import org.apache.flink.util.Preconditions; import org.jboss.netty.channel.ChannelException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.IllegalConfigurationException; -import org.apache.flink.core.memory.HeapMemorySegment; -import org.apache.flink.core.memory.HybridMemorySegment; -import org.apache.flink.core.memory.MemorySegmentFactory; -import org.apache.flink.core.memory.MemoryType; -import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode; -import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.io.network.NetworkEnvironment; -import org.apache.flink.runtime.io.network.netty.NettyConfig; -import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcMethod; import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.rpc.akka.AkkaRpcService; -import org.apache.flink.runtime.taskmanager.MemoryLogger; -import org.apache.flink.runtime.util.EnvironmentInformation; -import org.apache.flink.runtime.util.LeaderRetrievalUtils; -import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; -import org.apache.flink.util.MathUtils; -import org.apache.flink.util.NetUtils; - -import scala.Tuple2; -import scala.Option; -import scala.Some; -import scala.concurrent.duration.Duration; -import scala.concurrent.duration.FiniteDuration; - -import java.io.File; -import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.net.BindException; -import java.net.InetAddress; -import java.net.InetSocketAddress; import java.util.UUID; -import java.util.concurrent.TimeUnit; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -98,12 +53,10 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); - /** The unique resource ID of this TaskExecutor */ - private final ResourceID resourceID; - + /** The connection information of this task manager */ private final TaskManagerLocation taskManagerLocation; - /** The access to the leader election and metadata storage services */ + /** The access to the leader election and retrieval services */ private final HighAvailabilityServices haServices; /** The task manager configuration */ @@ -128,28 +81,26 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { // ------------------------------------------------------------------------ public TaskExecutor( - TaskExecutorConfiguration taskExecutorConfig, - ResourceID resourceID, - TaskManagerLocation taskManagerLocation, - MemoryManager memoryManager, - IOManager ioManager, - NetworkEnvironment networkEnvironment, - RpcService rpcService, - HighAvailabilityServices haServices) { + TaskExecutorConfiguration taskExecutorConfig, + TaskManagerLocation taskManagerLocation, + RpcService rpcService, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment, + HighAvailabilityServices haServices) { super(rpcService); checkArgument(taskExecutorConfig.getNumberOfSlots() > 0, "The number of slots has to be larger than 0."); this.taskExecutorConfig = checkNotNull(taskExecutorConfig); - this.resourceID = checkNotNull(resourceID); this.taskManagerLocation = checkNotNull(taskManagerLocation); this.memoryManager = checkNotNull(memoryManager); this.ioManager = checkNotNull(ioManager); this.networkEnvironment = checkNotNull(networkEnvironment); this.haServices = checkNotNull(haServices); - this.numberOfSlots = taskExecutorConfig.getNumberOfSlots(); + this.numberOfSlots = taskExecutorConfig.getNumberOfSlots(); } // ------------------------------------------------------------------------ @@ -207,7 +158,6 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { } } - /** * Requests a slot from the TaskManager * * @param allocationID id for the request @@ -220,126 +170,11 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { } /** - * Starts and runs the TaskManager. - * <p/> - * This method first tries to select the network interface to use for the TaskManager - * communication. The network interface is used both for the actor communication - * (coordination) as well as for the data exchange between task managers. Unless - * the hostname/interface is explicitly configured in the configuration, this - * method will try out various interfaces and methods to connect to the JobManager - * and select the one where the connection attempt is successful. - * <p/> - * After selecting the network interface, this method brings up an actor system - * for the TaskManager and its actors, starts the TaskManager's services - * (library cache, shuffle network stack, ...), and starts the TaskManager itself. - * - * @param configuration The configuration for the TaskManager. - * @param resourceID The id of the resource which the task manager will run on. - */ - public static void selectNetworkInterfaceAndRunTaskManager( - Configuration configuration, - ResourceID resourceID) throws Exception { - - final InetSocketAddress taskManagerAddress = selectNetworkInterfaceAndPort(configuration); - - runTaskManager(taskManagerAddress.getHostName(), resourceID, taskManagerAddress.getPort(), configuration); - } - - private static InetSocketAddress selectNetworkInterfaceAndPort(Configuration configuration) - throws Exception { - String taskManagerHostname = configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null); - if (taskManagerHostname != null) { - LOG.info("Using configured hostname/address for TaskManager: " + taskManagerHostname); - } else { - LeaderRetrievalService leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration); - FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(configuration); - - InetAddress taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, lookupTimeout); - taskManagerHostname = taskManagerAddress.getHostName(); - LOG.info("TaskManager will use hostname/address '{}' ({}) for communication.", - taskManagerHostname, taskManagerAddress.getHostAddress()); - } - - // if no task manager port has been configured, use 0 (system will pick any free port) - final int actorSystemPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0); - if (actorSystemPort < 0 || actorSystemPort > 65535) { - throw new IllegalConfigurationException("Invalid value for '" + - ConfigConstants.TASK_MANAGER_IPC_PORT_KEY + - "' (port for the TaskManager actor system) : " + actorSystemPort + - " - Leave config parameter empty or use 0 to let the system choose a port automatically."); - } - - return new InetSocketAddress(taskManagerHostname, actorSystemPort); - } - - /** - * Starts and runs the TaskManager. Brings up an actor system for the TaskManager and its - * actors, starts the TaskManager's services (library cache, shuffle network stack, ...), - * and starts the TaskManager itself. - * <p/> - * This method will also spawn a process reaper for the TaskManager (kill the process if - * the actor fails) and optionally start the JVM memory logging thread. - * - * @param taskManagerHostname The hostname/address of the interface where the actor system - * will communicate. - * @param resourceID The id of the resource which the task manager will run on. - * @param actorSystemPort The port at which the actor system will communicate. - * @param configuration The configuration for the TaskManager. - */ - private static void runTaskManager( - String taskManagerHostname, - ResourceID resourceID, - int actorSystemPort, - final Configuration configuration) throws Exception { - - LOG.info("Starting TaskManager"); - - // Bring up the TaskManager actor system first, bind it to the given address. - - LOG.info("Starting TaskManager actor system at " + - NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort)); - - final ActorSystem taskManagerSystem; - try { - Tuple2<String, Object> address = new Tuple2<String, Object>(taskManagerHostname, actorSystemPort); - Config akkaConfig = AkkaUtils.getAkkaConfig(configuration, new Some<>(address)); - LOG.debug("Using akka configuration\n " + akkaConfig); - taskManagerSystem = AkkaUtils.createActorSystem(akkaConfig); - } catch (Throwable t) { - if (t instanceof ChannelException) { - Throwable cause = t.getCause(); - if (cause != null && t.getCause() instanceof BindException) { - String address = NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort); - throw new IOException("Unable to bind TaskManager actor system to address " + - address + " - " + cause.getMessage(), t); - } - } - throw new Exception("Could not create TaskManager actor system", t); - } - - // start akka rpc service based on actor system - final Time timeout = Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis()); - final AkkaRpcService akkaRpcService = new AkkaRpcService(taskManagerSystem, timeout); - - // start high availability service to implement getResourceManagerLeaderRetriever method only - final HighAvailabilityServices haServices = new HighAvailabilityServices() { - @Override - public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception { - return LeaderRetrievalUtils.createLeaderRetrievalService(configuration); - } - - @Override public LeaderRetrievalService getJobMasterLeaderRetriever(JobID jobID) throws Exception { return null; } @Override - public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception { - return null; - } - - @Override - public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception { return null; } @@ -350,552 +185,12 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { @Override public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception { - return null; - } - }; - - // start all the TaskManager services (network stack, library cache, ...) - // and the TaskManager actor - try { - LOG.info("Starting TaskManager actor"); - TaskExecutor taskExecutor = startTaskManagerComponentsAndActor( - configuration, - resourceID, - akkaRpcService, - taskManagerHostname, - haServices, - false); - - taskExecutor.start(); - - // if desired, start the logging daemon that periodically logs the memory usage information - if (LOG.isInfoEnabled() && configuration.getBoolean( - ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD, - ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD)) { - LOG.info("Starting periodic memory usage logger"); - - long interval = configuration.getLong( - ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS, - ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS); - - MemoryLogger logger = new MemoryLogger(LOG, interval, taskManagerSystem); - logger.start(); - } - - // block until everything is done - taskManagerSystem.awaitTermination(); - } catch (Throwable t) { - LOG.error("Error while starting up taskManager", t); - try { - taskManagerSystem.shutdown(); - } catch (Throwable tt) { - LOG.warn("Could not cleanly shut down actor system", tt); - } - throw t; - } - } - - // -------------------------------------------------------------------------- - // Starting and running the TaskManager - // -------------------------------------------------------------------------- - - /** - * @param configuration The configuration for the TaskManager. - * @param resourceID The id of the resource which the task manager will run on. - * @param rpcService The rpc service which is used to start and connect to the TaskManager RpcEndpoint . - * @param taskManagerHostname The hostname/address that describes the TaskManager's data location. - * @param haServices Optionally, a high availability service can be provided. If none is given, - * then a HighAvailabilityServices is constructed from the configuration. - * @param localTaskManagerCommunication If true, the TaskManager will not initiate the TCP network stack. - * @return An ActorRef to the TaskManager actor. - * @throws IllegalConfigurationException Thrown, if the given config contains illegal values. - * @throws IOException Thrown, if any of the I/O components (such as buffer pools, - * I/O manager, ...) cannot be properly started. - * @throws Exception Thrown is some other error occurs while parsing the configuration - * or starting the TaskManager components. - */ - public static TaskExecutor startTaskManagerComponentsAndActor( - Configuration configuration, - ResourceID resourceID, - RpcService rpcService, - String taskManagerHostname, - HighAvailabilityServices haServices, - boolean localTaskManagerCommunication) throws Exception { - - final TaskExecutorConfiguration taskExecutorConfig = parseTaskManagerConfiguration( - configuration, taskManagerHostname, localTaskManagerCommunication); - - TaskManagerComponents taskManagerComponents = createTaskManagerComponents( - resourceID, - InetAddress.getByName(taskManagerHostname), - taskExecutorConfig, - configuration); - - final TaskExecutor taskExecutor = new TaskExecutor( - taskExecutorConfig, - resourceID, - taskManagerComponents.getTaskManagerLocation(), - taskManagerComponents.getMemoryManager(), - taskManagerComponents.getIOManager(), - taskManagerComponents.getNetworkEnvironment(), - rpcService, - haServices); - - return taskExecutor; - } - - /** - * Creates and returns the task manager components. - * - * @param resourceID resource ID of the task manager - * @param taskManagerAddress address of the task manager - * @param taskExecutorConfig task manager configuration - * @param configuration of Flink - * @return task manager components - * @throws Exception - */ - private static TaskExecutor.TaskManagerComponents createTaskManagerComponents( - ResourceID resourceID, - InetAddress taskManagerAddress, - TaskExecutorConfiguration taskExecutorConfig, - Configuration configuration) throws Exception { - MemoryType memType = taskExecutorConfig.getNetworkConfig().memoryType(); - - // pre-start checks - checkTempDirs(taskExecutorConfig.getTmpDirPaths()); - - NetworkEnvironmentConfiguration networkEnvironmentConfiguration = taskExecutorConfig.getNetworkConfig(); - - NetworkBufferPool networkBufferPool = new NetworkBufferPool( - networkEnvironmentConfiguration.numNetworkBuffers(), - networkEnvironmentConfiguration.networkBufferSize(), - networkEnvironmentConfiguration.memoryType()); - - ConnectionManager connectionManager; - - if (networkEnvironmentConfiguration.nettyConfig().isDefined()) { - connectionManager = new NettyConnectionManager(networkEnvironmentConfiguration.nettyConfig().get()); - } else { - connectionManager = new LocalConnectionManager(); - } - - ResultPartitionManager resultPartitionManager = new ResultPartitionManager(); - TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher(); - - KvStateRegistry kvStateRegistry = new KvStateRegistry(); - - KvStateServer kvStateServer; - - if (networkEnvironmentConfiguration.nettyConfig().isDefined()) { - NettyConfig nettyConfig = networkEnvironmentConfiguration.nettyConfig().get(); - - int numNetworkThreads = networkEnvironmentConfiguration.queryServerNetworkThreads() == 0 ? - nettyConfig.getNumberOfSlots() : networkEnvironmentConfiguration.queryServerNetworkThreads(); - - int numQueryThreads = networkEnvironmentConfiguration.queryServerQueryThreads() == 0 ? - nettyConfig.getNumberOfSlots() : networkEnvironmentConfiguration.queryServerQueryThreads(); - - kvStateServer = new KvStateServer( - taskManagerAddress, - networkEnvironmentConfiguration.queryServerPort(), - numNetworkThreads, - numQueryThreads, - kvStateRegistry, - new DisabledKvStateRequestStats()); - } else { - kvStateServer = null; - } - - // we start the network first, to make sure it can allocate its buffers first - final NetworkEnvironment network = new NetworkEnvironment( - networkBufferPool, - connectionManager, - resultPartitionManager, - taskEventDispatcher, - kvStateRegistry, - kvStateServer, - networkEnvironmentConfiguration.ioMode(), - networkEnvironmentConfiguration.partitionRequestInitialBackoff(), - networkEnvironmentConfiguration.partitinRequestMaxBackoff()); - - network.start(); - - TaskManagerLocation taskManagerLocation = new TaskManagerLocation( - resourceID, - taskManagerAddress, - network.getConnectionManager().getDataPort()); - - // computing the amount of memory to use depends on how much memory is available - // it strictly needs to happen AFTER the network stack has been initialized - - // check if a value has been configured - long configuredMemory = configuration.getLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L); - checkConfigParameter(configuredMemory == -1 || configuredMemory > 0, configuredMemory, - ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, - "MemoryManager needs at least one MB of memory. " + - "If you leave this config parameter empty, the system automatically " + - "pick a fraction of the available memory."); - - final long memorySize; - boolean preAllocateMemory = configuration.getBoolean( - ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE); - if (configuredMemory > 0) { - if (preAllocateMemory) { - LOG.info("Using {} MB for managed memory." , configuredMemory); - } else { - LOG.info("Limiting managed memory to {} MB, memory will be allocated lazily." , configuredMemory); - } - memorySize = configuredMemory << 20; // megabytes to bytes - } else { - float fraction = configuration.getFloat( - ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, - ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION); - checkConfigParameter(fraction > 0.0f && fraction < 1.0f, fraction, - ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, - "MemoryManager fraction of the free memory must be between 0.0 and 1.0"); - - if (memType == MemoryType.HEAP) { - long relativeMemSize = (long) (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * fraction); - if (preAllocateMemory) { - LOG.info("Using {} of the currently free heap space for managed heap memory ({} MB)." , - fraction , relativeMemSize >> 20); - } else { - LOG.info("Limiting managed memory to {} of the currently free heap space ({} MB), " + - "memory will be allocated lazily." , fraction , relativeMemSize >> 20); - } - memorySize = relativeMemSize; - } else if (memType == MemoryType.OFF_HEAP) { - // The maximum heap memory has been adjusted according to the fraction - long maxMemory = EnvironmentInformation.getMaxJvmHeapMemory(); - long directMemorySize = (long) (maxMemory / (1.0 - fraction) * fraction); - if (preAllocateMemory) { - LOG.info("Using {} of the maximum memory size for managed off-heap memory ({} MB)." , - fraction, directMemorySize >> 20); - } else { - LOG.info("Limiting managed memory to {} of the maximum memory size ({} MB)," + - " memory will be allocated lazily.", fraction, directMemorySize >> 20); - } - memorySize = directMemorySize; - } else { - throw new RuntimeException("No supported memory type detected."); - } - } - - // now start the memory manager - final MemoryManager memoryManager; - try { - memoryManager = new MemoryManager( - memorySize, - taskExecutorConfig.getNumberOfSlots(), - taskExecutorConfig.getNetworkConfig().networkBufferSize(), - memType, - preAllocateMemory); - } catch (OutOfMemoryError e) { - if (memType == MemoryType.HEAP) { - throw new Exception("OutOfMemory error (" + e.getMessage() + - ") while allocating the TaskManager heap memory (" + memorySize + " bytes).", e); - } else if (memType == MemoryType.OFF_HEAP) { - throw new Exception("OutOfMemory error (" + e.getMessage() + - ") while allocating the TaskManager off-heap memory (" + memorySize + - " bytes).Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e); - } else { - throw e; - } - } - - // start the I/O manager, it will create some temp directories. - final IOManager ioManager = new IOManagerAsync(taskExecutorConfig.getTmpDirPaths()); - - return new TaskExecutor.TaskManagerComponents(taskManagerLocation, memoryManager, ioManager, network); - } - - // -------------------------------------------------------------------------- - // Parsing and checking the TaskManager Configuration - // -------------------------------------------------------------------------- - - /** - * Utility method to extract TaskManager config parameters from the configuration and to - * sanity check them. - * - * @param configuration The configuration. - * @param taskManagerHostname The host name under which the TaskManager communicates. - * @param localTaskManagerCommunication True, to skip initializing the network stack. - * Use only in cases where only one task manager runs. - * @return TaskExecutorConfiguration that wrappers InstanceConnectionInfo, NetworkEnvironmentConfiguration, etc. - */ - private static TaskExecutorConfiguration parseTaskManagerConfiguration( - Configuration configuration, - String taskManagerHostname, - boolean localTaskManagerCommunication) throws Exception { - - // ------- read values from the config and check them --------- - // (a lot of them) - - // ----> hosts / ports for communication and data exchange - - int dataport = configuration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT); - if (dataport == 0) { - dataport = NetUtils.getAvailablePort(); - } - checkConfigParameter(dataport > 0, dataport, ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, - "Leave config parameter empty or use 0 to let the system choose a port automatically."); - - InetAddress taskManagerAddress = InetAddress.getByName(taskManagerHostname); - final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport); - - // ----> memory / network stack (shuffles/broadcasts), task slots, temp directories - - // we need this because many configs have been written with a "-1" entry - int slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); - if (slots == -1) { - slots = 1; - } - checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, - "Number of task slots must be at least one."); - - final int numNetworkBuffers = configuration.getInteger( - ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS); - checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers, - ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, ""); - - final int pageSize = configuration.getInteger( - ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE); - // check page size of for minimum size - checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize, - ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, - "Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE); - // check page size for power of two - checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize, - ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, - "Memory segment size must be a power of 2."); - - // check whether we use heap or off-heap memory - final MemoryType memType; - if (configuration.getBoolean(ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false)) { - memType = MemoryType.OFF_HEAP; - } else { - memType = MemoryType.HEAP; - } - - // initialize the memory segment factory accordingly - if (memType == MemoryType.HEAP) { - if (!MemorySegmentFactory.initializeIfNotInitialized(HeapMemorySegment.FACTORY)) { - throw new Exception("Memory type is set to heap memory, but memory segment " + - "factory has been initialized for off-heap memory segments"); - } - } else { - if (!MemorySegmentFactory.initializeIfNotInitialized(HybridMemorySegment.FACTORY)) { - throw new Exception("Memory type is set to off-heap memory, but memory segment " + - "factory has been initialized for heap memory segments"); - } - } - - final String[] tmpDirs = configuration.getString( - ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator); - - final NettyConfig nettyConfig; - if (!localTaskManagerCommunication) { - nettyConfig = new NettyConfig( - taskManagerInetSocketAddress.getAddress(), - taskManagerInetSocketAddress.getPort(), - pageSize, - slots, - configuration); - } else { - nettyConfig = null; - } - - // Default spill I/O mode for intermediate results - final String syncOrAsync = configuration.getString( - ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE, - ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE); - - final IOMode ioMode; - if (syncOrAsync.equals("async")) { - ioMode = IOManager.IOMode.ASYNC; - } else { - ioMode = IOManager.IOMode.SYNC; - } - - final int queryServerPort = configuration.getInteger( - ConfigConstants.QUERYABLE_STATE_SERVER_PORT, - ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_PORT); - - final int queryServerNetworkThreads = configuration.getInteger( - ConfigConstants.QUERYABLE_STATE_SERVER_NETWORK_THREADS, - ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_NETWORK_THREADS); - - final int queryServerQueryThreads = configuration.getInteger( - ConfigConstants.QUERYABLE_STATE_SERVER_QUERY_THREADS, - ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_QUERY_THREADS); - - final NetworkEnvironmentConfiguration networkConfig = new NetworkEnvironmentConfiguration( - numNetworkBuffers, - pageSize, - memType, - ioMode, - queryServerPort, - queryServerNetworkThreads, - queryServerQueryThreads, - Option.apply(nettyConfig), - 500, - 30000); - - // ----> timeouts, library caching, profiling - - final FiniteDuration timeout; - try { - timeout = AkkaUtils.getTimeout(configuration); - } catch (Exception e) { - throw new IllegalArgumentException( - "Invalid format for '" + ConfigConstants.AKKA_ASK_TIMEOUT + - "'.Use formats like '50 s' or '1 min' to specify the timeout."); - } - LOG.info("Messages between TaskManager and JobManager have a max timeout of " + timeout); - - final long cleanupInterval = configuration.getLong( - ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL, - ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000; - - final FiniteDuration finiteRegistrationDuration; - try { - Duration maxRegistrationDuration = Duration.create(configuration.getString( - ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, - ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION)); - if (maxRegistrationDuration.isFinite()) { - finiteRegistrationDuration = new FiniteDuration(maxRegistrationDuration.toSeconds(), TimeUnit.SECONDS); - } else { - finiteRegistrationDuration = null; - } - } catch (NumberFormatException e) { - throw new IllegalArgumentException("Invalid format for parameter " + - ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, e); - } - - final FiniteDuration initialRegistrationPause; - try { - Duration pause = Duration.create(configuration.getString( - ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, - ConfigConstants.DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE)); - if (pause.isFinite()) { - initialRegistrationPause = new FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS); - } else { - throw new IllegalArgumentException("The initial registration pause must be finite: " + pause); - } - } catch (NumberFormatException e) { - throw new IllegalArgumentException("Invalid format for parameter " + - ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e); - } - - final FiniteDuration maxRegistrationPause; - try { - Duration pause = Duration.create(configuration.getString( - ConfigConstants.TASK_MANAGER_MAX_REGISTARTION_PAUSE, - ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE)); - if (pause.isFinite()) { - maxRegistrationPause = new FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS); - } else { - throw new IllegalArgumentException("The maximum registration pause must be finite: " + pause); - } - } catch (NumberFormatException e) { - throw new IllegalArgumentException("Invalid format for parameter " + - ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e); - } - - final FiniteDuration refusedRegistrationPause; - try { - Duration pause = Duration.create(configuration.getString( - ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE, - ConfigConstants.DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE)); - if (pause.isFinite()) { - refusedRegistrationPause = new FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS); - } else { - throw new IllegalArgumentException("The refused registration pause must be finite: " + pause); - } - } catch (NumberFormatException e) { - throw new IllegalArgumentException("Invalid format for parameter " + - ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e); - } - - return new TaskExecutorConfiguration( - tmpDirs, - cleanupInterval, - networkConfig, - timeout, - finiteRegistrationDuration, - slots, - configuration, - initialRegistrationPause, - maxRegistrationPause, - refusedRegistrationPause); - } - - /** - * Validates a condition for a config parameter and displays a standard exception, if the - * the condition does not hold. - * - * @param condition The condition that must hold. If the condition is false, an exception is thrown. - * @param parameter The parameter value. Will be shown in the exception message. - * @param name The name of the config parameter. Will be shown in the exception message. - * @param errorMessage The optional custom error message to append to the exception message. - */ - private static void checkConfigParameter( - boolean condition, - Object parameter, - String name, - String errorMessage) { - if (!condition) { - throw new IllegalConfigurationException("Invalid configuration value for " + name + " : " + parameter + " - " + errorMessage); - } - } - - /** - * Validates that all the directories denoted by the strings do actually exist, are proper - * directories (not files), and are writable. - * - * @param tmpDirs The array of directory paths to check. - * @throws Exception Thrown if any of the directories does not exist or is not writable - * or is a file, rather than a directory. - */ - private static void checkTempDirs(String[] tmpDirs) throws IOException { - for (String dir : tmpDirs) { - if (dir != null && !dir.equals("")) { - File file = new File(dir); - if (!file.exists()) { - throw new IOException("Temporary file directory " + file.getAbsolutePath() + " does not exist."); - } - if (!file.isDirectory()) { - throw new IOException("Temporary file directory " + file.getAbsolutePath() + " is not a directory."); - } - if (!file.canWrite()) { - throw new IOException("Temporary file directory " + file.getAbsolutePath() + " is not writable."); - } - - if (LOG.isInfoEnabled()) { - long totalSpaceGb = file.getTotalSpace() >> 30; - long usableSpaceGb = file.getUsableSpace() >> 30; - double usablePercentage = (double)usableSpaceGb / totalSpaceGb * 100; - String path = file.getAbsolutePath(); - LOG.info(String.format("Temporary file directory '%s': total %d GB, " + "usable %d GB (%.2f%% usable)", - path, totalSpaceGb, usableSpaceGb, usablePercentage)); - } - } else { - throw new IllegalArgumentException("Temporary file directory #$id is null."); - } - } - } - // ------------------------------------------------------------------------ // Properties // ------------------------------------------------------------------------ public ResourceID getResourceID() { - return resourceID; + return taskManagerLocation.getResourceID(); } // ------------------------------------------------------------------------ @@ -959,37 +254,4 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { } } - private static class TaskManagerComponents { - private final TaskManagerLocation taskManagerLocation; - private final MemoryManager memoryManager; - private final IOManager ioManager; - private final NetworkEnvironment networkEnvironment; - - private TaskManagerComponents( - TaskManagerLocation taskManagerLocation, - MemoryManager memoryManager, - IOManager ioManager, - NetworkEnvironment networkEnvironment) { - this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation); - this.memoryManager = Preconditions.checkNotNull(memoryManager); - this.ioManager = Preconditions.checkNotNull(ioManager); - this.networkEnvironment = Preconditions.checkNotNull(networkEnvironment); - } - - public MemoryManager getMemoryManager() { - return memoryManager; - } - - public IOManager getIOManager() { - return ioManager; - } - - public NetworkEnvironment getNetworkEnvironment() { - return networkEnvironment; - } - - public TaskManagerLocation getTaskManagerLocation() { - return taskManagerLocation; - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/c34f13c2/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRunner.java new file mode 100644 index 0000000..4f756fb --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRunner.java @@ -0,0 +1,749 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.memory.HeapMemorySegment; +import org.apache.flink.core.memory.HybridMemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.core.memory.MemoryType; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.io.network.ConnectionManager; +import org.apache.flink.runtime.io.network.LocalConnectionManager; +import org.apache.flink.runtime.io.network.NetworkEnvironment; +import org.apache.flink.runtime.io.network.TaskEventDispatcher; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.netty.NettyConfig; +import org.apache.flink.runtime.io.network.netty.NettyConnectionManager; +import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.query.KvStateRegistry; +import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats; +import org.apache.flink.runtime.query.netty.KvStateServer; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; +import org.apache.flink.runtime.taskexecutor.TaskExecutorConfiguration; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.NetUtils; + +import akka.actor.ActorSystem; +import akka.util.Timeout; +import com.typesafe.config.Config; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; +import scala.Some; +import scala.Tuple2; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class is the executable entry point for the task manager in yarn or standalone mode. + * It constructs the related components (network, I/O manager, memory manager, RPC service, HA service) + * and starts them. + */ +public class TaskManagerRunner { + + private static final Logger LOG = LoggerFactory.getLogger(TaskManagerRunner.class); + + /** + * Constructs related components of the TaskManager and starts them. + * + * @param configuration The configuration for the TaskManager. + * @param resourceID The id of the resource which the task manager will run on. + * @param rpcService Optionally, The rpc service which is used to start and connect to the TaskManager RpcEndpoint . + * If none is given, then a RpcService is constructed from the configuration. + * @param taskManagerHostname Optionally, The hostname/address that describes the TaskManager's data location. + * If none is given, it can be got from the configuration. + * @param localTaskManagerCommunication If true, the TaskManager will not initiate the TCP network stack. + * @param haServices Optionally, a high availability service can be provided. If none is given, + * then a HighAvailabilityServices is constructed from the configuration. + */ + public static void createAndStartComponents( + final Configuration configuration, + final ResourceID resourceID, + RpcService rpcService, + String taskManagerHostname, + boolean localTaskManagerCommunication, + HighAvailabilityServices haServices) throws Exception { + + checkNotNull(configuration); + checkNotNull(resourceID); + + if (taskManagerHostname == null || taskManagerHostname.isEmpty()) { + taskManagerHostname = selectNetworkInterface(configuration); + } + + if (rpcService == null) { + // if no task manager port has been configured, use 0 (system will pick any free port) + final int actorSystemPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0); + if (actorSystemPort < 0 || actorSystemPort > 65535) { + throw new IllegalConfigurationException("Invalid value for '" + + ConfigConstants.TASK_MANAGER_IPC_PORT_KEY + + "' (port for the TaskManager actor system) : " + actorSystemPort + + " - Leave config parameter empty or use 0 to let the system choose a port automatically."); + } + rpcService = createRpcService(configuration, taskManagerHostname, actorSystemPort); + } + + if(haServices == null) { + // start high availability service to implement getResourceManagerLeaderRetriever method only + haServices = new HighAvailabilityServices() { + @Override + public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception { + return LeaderRetrievalUtils.createLeaderRetrievalService(configuration); + } + + @Override + public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception { + return null; + } + + @Override + public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception { + return null; + } + }; + } + + createAndStartTaskManagerComponents( + configuration, + resourceID, + rpcService, + taskManagerHostname, + haServices, + localTaskManagerCommunication); + } + + /** + * <p/> + * This method tries to select the network interface to use for the TaskManager + * communication. The network interface is used both for the actor communication + * (coordination) as well as for the data exchange between task managers. Unless + * the hostname/interface is explicitly configured in the configuration, this + * method will try out various interfaces and methods to connect to the JobManager + * and select the one where the connection attempt is successful. + * <p/> + * + * @param configuration The configuration for the TaskManager. + * @return The host name under which the TaskManager communicates. + */ + private static String selectNetworkInterface(Configuration configuration) throws Exception { + String taskManagerHostname = configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null); + if (taskManagerHostname != null) { + LOG.info("Using configured hostname/address for TaskManager: " + taskManagerHostname); + } else { + LeaderRetrievalService leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration); + FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(configuration); + + InetAddress taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, lookupTimeout); + taskManagerHostname = taskManagerAddress.getHostName(); + LOG.info("TaskManager will use hostname/address '{}' ({}) for communication.", + taskManagerHostname, taskManagerAddress.getHostAddress()); + } + + return taskManagerHostname; + } + + /** + * Utility method to create RPC service from configuration and hostname, port. + * + * @param configuration The configuration for the TaskManager. + * @param taskManagerHostname The hostname/address that describes the TaskManager's data location. + * @param actorSystemPort If true, the TaskManager will not initiate the TCP network stack. + * @return The rpc service which is used to start and connect to the TaskManager RpcEndpoint . + * @throws java.io.IOException Thrown, if the actor system can not bind to the address + * @throws java.lang.Exception Thrown is some other error occurs while creating akka actor system + */ + private static RpcService createRpcService(Configuration configuration, String taskManagerHostname, int actorSystemPort) + throws Exception{ + + // Bring up the TaskManager actor system first, bind it to the given address. + + LOG.info("Starting TaskManager actor system at " + + NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort)); + + final ActorSystem taskManagerSystem; + try { + Tuple2<String, Object> address = new Tuple2<String, Object>(taskManagerHostname, actorSystemPort); + Config akkaConfig = AkkaUtils.getAkkaConfig(configuration, new Some<>(address)); + LOG.debug("Using akka configuration\n " + akkaConfig); + taskManagerSystem = AkkaUtils.createActorSystem(akkaConfig); + } catch (Throwable t) { + if (t instanceof org.jboss.netty.channel.ChannelException) { + Throwable cause = t.getCause(); + if (cause != null && t.getCause() instanceof java.net.BindException) { + String address = NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort); + throw new IOException("Unable to bind TaskManager actor system to address " + + address + " - " + cause.getMessage(), t); + } + } + throw new Exception("Could not create TaskManager actor system", t); + } + + // start akka rpc service based on actor system + final Timeout timeout = new Timeout(AkkaUtils.getTimeout(configuration).toMillis(), TimeUnit.MILLISECONDS); + final AkkaRpcService akkaRpcService = new AkkaRpcService(taskManagerSystem, timeout); + + return akkaRpcService; + } + + /** + * @param configuration The configuration for the TaskManager. + * @param resourceID The id of the resource which the task manager will run on. + * @param rpcService The rpc service which is used to start and connect to the TaskManager RpcEndpoint . + * @param taskManagerHostname The hostname/address that describes the TaskManager's data location. + * @param haServices Optionally, a high availability service can be provided. If none is given, + * then a HighAvailabilityServices is constructed from the configuration. + * @param localTaskManagerCommunication If true, the TaskManager will not initiate the TCP network stack. + * @throws IllegalConfigurationException Thrown, if the given config contains illegal values. + * @throws IOException Thrown, if any of the I/O components (such as buffer pools, I/O manager, ...) + * cannot be properly started. + * @throws Exception Thrown is some other error occurs while parsing the configuration or + * starting the TaskManager components. + */ + private static void createAndStartTaskManagerComponents( + Configuration configuration, + ResourceID resourceID, + RpcService rpcService, + String taskManagerHostname, + HighAvailabilityServices haServices, + boolean localTaskManagerCommunication) throws Exception { + + final TaskExecutorConfiguration taskManagerConfig = parseTaskManagerConfiguration( + configuration, taskManagerHostname, localTaskManagerCommunication); + + TaskManagerComponents taskManagerComponents = createTaskManagerComponents( + resourceID, + InetAddress.getByName(taskManagerHostname), + taskManagerConfig, + configuration); + + final TaskExecutor taskExecutor = new TaskExecutor( + taskManagerConfig, + taskManagerComponents.getTaskManagerLocation(), + rpcService, taskManagerComponents.getMemoryManager(), + taskManagerComponents.getIOManager(), + taskManagerComponents.getNetworkEnvironment(), + haServices); + + taskExecutor.start(); + } + + /** + * Creates and returns the task manager components. + * + * @param resourceID resource ID of the task manager + * @param taskManagerAddress address of the task manager + * @param taskExecutorConfig task manager configuration + * @param configuration of Flink + * @return task manager components + * @throws Exception + */ + private static TaskManagerComponents createTaskManagerComponents( + ResourceID resourceID, + InetAddress taskManagerAddress, + TaskExecutorConfiguration taskExecutorConfig, + Configuration configuration) throws Exception { + + MemoryType memType = taskExecutorConfig.getNetworkConfig().memoryType(); + + // pre-start checks + checkTempDirs(taskExecutorConfig.getTmpDirPaths()); + + NetworkEnvironmentConfiguration networkEnvironmentConfiguration = taskExecutorConfig.getNetworkConfig(); + + NetworkBufferPool networkBufferPool = new NetworkBufferPool( + networkEnvironmentConfiguration.numNetworkBuffers(), + networkEnvironmentConfiguration.networkBufferSize(), + networkEnvironmentConfiguration.memoryType()); + + ConnectionManager connectionManager; + + if (networkEnvironmentConfiguration.nettyConfig().isDefined()) { + connectionManager = new NettyConnectionManager(networkEnvironmentConfiguration.nettyConfig().get()); + } else { + connectionManager = new LocalConnectionManager(); + } + + ResultPartitionManager resultPartitionManager = new ResultPartitionManager(); + TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher(); + + KvStateRegistry kvStateRegistry = new KvStateRegistry(); + + KvStateServer kvStateServer; + + if (networkEnvironmentConfiguration.nettyConfig().isDefined()) { + NettyConfig nettyConfig = networkEnvironmentConfiguration.nettyConfig().get(); + + int numNetworkThreads = networkEnvironmentConfiguration.queryServerNetworkThreads() == 0 ? + nettyConfig.getNumberOfSlots() : networkEnvironmentConfiguration.queryServerNetworkThreads(); + + int numQueryThreads = networkEnvironmentConfiguration.queryServerQueryThreads() == 0 ? + nettyConfig.getNumberOfSlots() : networkEnvironmentConfiguration.queryServerQueryThreads(); + + kvStateServer = new KvStateServer( + taskManagerAddress, + networkEnvironmentConfiguration.queryServerPort(), + numNetworkThreads, + numQueryThreads, + kvStateRegistry, + new DisabledKvStateRequestStats()); + } else { + kvStateServer = null; + } + + // we start the network first, to make sure it can allocate its buffers first + final NetworkEnvironment network = new NetworkEnvironment( + networkBufferPool, + connectionManager, + resultPartitionManager, + taskEventDispatcher, + kvStateRegistry, + kvStateServer, + networkEnvironmentConfiguration.ioMode(), + networkEnvironmentConfiguration.partitionRequestInitialBackoff(), + networkEnvironmentConfiguration.partitinRequestMaxBackoff()); + + network.start(); + + final TaskManagerLocation taskManagerLocation = new TaskManagerLocation( + resourceID, + taskManagerAddress, + network.getConnectionManager().getDataPort()); + + // computing the amount of memory to use depends on how much memory is available + // it strictly needs to happen AFTER the network stack has been initialized + + // check if a value has been configured + long configuredMemory = configuration.getLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L); + checkConfigParameter(configuredMemory == -1 || configuredMemory > 0, configuredMemory, + ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, + "MemoryManager needs at least one MB of memory. " + + "If you leave this config parameter empty, the system automatically " + + "pick a fraction of the available memory."); + + final long memorySize; + boolean preAllocateMemory = configuration.getBoolean( + ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE); + if (configuredMemory > 0) { + if (preAllocateMemory) { + LOG.info("Using {} MB for managed memory." , configuredMemory); + } else { + LOG.info("Limiting managed memory to {} MB, memory will be allocated lazily." , configuredMemory); + } + memorySize = configuredMemory << 20; // megabytes to bytes + } else { + float fraction = configuration.getFloat( + ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, + ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION); + checkConfigParameter(fraction > 0.0f && fraction < 1.0f, fraction, + ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, + "MemoryManager fraction of the free memory must be between 0.0 and 1.0"); + + if (memType == MemoryType.HEAP) { + long relativeMemSize = (long) (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * fraction); + if (preAllocateMemory) { + LOG.info("Using {} of the currently free heap space for managed heap memory ({} MB)." , + fraction , relativeMemSize >> 20); + } else { + LOG.info("Limiting managed memory to {} of the currently free heap space ({} MB), " + + "memory will be allocated lazily." , fraction , relativeMemSize >> 20); + } + memorySize = relativeMemSize; + } else if (memType == MemoryType.OFF_HEAP) { + // The maximum heap memory has been adjusted according to the fraction + long maxMemory = EnvironmentInformation.getMaxJvmHeapMemory(); + long directMemorySize = (long) (maxMemory / (1.0 - fraction) * fraction); + if (preAllocateMemory) { + LOG.info("Using {} of the maximum memory size for managed off-heap memory ({} MB)." , + fraction, directMemorySize >> 20); + } else { + LOG.info("Limiting managed memory to {} of the maximum memory size ({} MB)," + + " memory will be allocated lazily.", fraction, directMemorySize >> 20); + } + memorySize = directMemorySize; + } else { + throw new RuntimeException("No supported memory type detected."); + } + } + + // now start the memory manager + final MemoryManager memoryManager; + try { + memoryManager = new MemoryManager( + memorySize, + taskExecutorConfig.getNumberOfSlots(), + taskExecutorConfig.getNetworkConfig().networkBufferSize(), + memType, + preAllocateMemory); + } catch (OutOfMemoryError e) { + if (memType == MemoryType.HEAP) { + throw new Exception("OutOfMemory error (" + e.getMessage() + + ") while allocating the TaskManager heap memory (" + memorySize + " bytes).", e); + } else if (memType == MemoryType.OFF_HEAP) { + throw new Exception("OutOfMemory error (" + e.getMessage() + + ") while allocating the TaskManager off-heap memory (" + memorySize + + " bytes).Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e); + } else { + throw e; + } + } + + // start the I/O manager, it will create some temp directories. + final IOManager ioManager = new IOManagerAsync(taskExecutorConfig.getTmpDirPaths()); + + return new TaskManagerComponents(taskManagerLocation, memoryManager, ioManager, network); + } + + // -------------------------------------------------------------------------- + // Parsing and checking the TaskManager Configuration + // -------------------------------------------------------------------------- + + /** + * Utility method to extract TaskManager config parameters from the configuration and to + * sanity check them. + * + * @param configuration The configuration. + * @param taskManagerHostname The host name under which the TaskManager communicates. + * @param localTaskManagerCommunication True, to skip initializing the network stack. + * Use only in cases where only one task manager runs. + * @return TaskExecutorConfiguration that wrappers InstanceConnectionInfo, NetworkEnvironmentConfiguration, etc. + */ + private static TaskExecutorConfiguration parseTaskManagerConfiguration( + Configuration configuration, + String taskManagerHostname, + boolean localTaskManagerCommunication) throws Exception { + + // ------- read values from the config and check them --------- + // (a lot of them) + + // ----> hosts / ports for communication and data exchange + + int dataport = configuration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT); + + checkConfigParameter(dataport > 0, dataport, ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, + "Leave config parameter empty or use 0 to let the system choose a port automatically."); + + InetAddress taskManagerAddress = InetAddress.getByName(taskManagerHostname); + final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport); + + // ----> memory / network stack (shuffles/broadcasts), task slots, temp directories + + // we need this because many configs have been written with a "-1" entry + int slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); + if (slots == -1) { + slots = 1; + } + + checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, + "Number of task slots must be at least one."); + + final int numNetworkBuffers = configuration.getInteger( + ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS); + + checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers, + ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, ""); + + final int pageSize = configuration.getInteger( + ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE); + + // check page size of for minimum size + checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize, + ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, + "Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE); + + // check page size for power of two + checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize, + ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, + "Memory segment size must be a power of 2."); + + // check whether we use heap or off-heap memory + final MemoryType memType; + if (configuration.getBoolean(ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false)) { + memType = MemoryType.OFF_HEAP; + } else { + memType = MemoryType.HEAP; + } + + // initialize the memory segment factory accordingly + if (memType == MemoryType.HEAP) { + if (!MemorySegmentFactory.initializeIfNotInitialized(HeapMemorySegment.FACTORY)) { + throw new Exception("Memory type is set to heap memory, but memory segment " + + "factory has been initialized for off-heap memory segments"); + } + } else { + if (!MemorySegmentFactory.initializeIfNotInitialized(HybridMemorySegment.FACTORY)) { + throw new Exception("Memory type is set to off-heap memory, but memory segment " + + "factory has been initialized for heap memory segments"); + } + } + + final String[] tmpDirs = configuration.getString( + ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator); + + final NettyConfig nettyConfig; + if (!localTaskManagerCommunication) { + nettyConfig = new NettyConfig(taskManagerInetSocketAddress.getAddress(), + taskManagerInetSocketAddress.getPort(), pageSize, slots, configuration); + } else { + nettyConfig = null; + } + + // Default spill I/O mode for intermediate results + final String syncOrAsync = configuration.getString( + ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE, + ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE); + + final IOManager.IOMode ioMode; + if (syncOrAsync.equals("async")) { + ioMode = IOManager.IOMode.ASYNC; + } else { + ioMode = IOManager.IOMode.SYNC; + } + + final int queryServerPort = configuration.getInteger( + ConfigConstants.QUERYABLE_STATE_SERVER_PORT, + ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_PORT); + + final int queryServerNetworkThreads = configuration.getInteger( + ConfigConstants.QUERYABLE_STATE_SERVER_NETWORK_THREADS, + ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_NETWORK_THREADS); + + final int queryServerQueryThreads = configuration.getInteger( + ConfigConstants.QUERYABLE_STATE_SERVER_QUERY_THREADS, + ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_QUERY_THREADS); + + final NetworkEnvironmentConfiguration networkConfig = new NetworkEnvironmentConfiguration( + numNetworkBuffers, + pageSize, + memType, + ioMode, + queryServerPort, + queryServerNetworkThreads, + queryServerQueryThreads, + Option.apply(nettyConfig), + 500, + 3000); + + // ----> timeouts, library caching, profiling + + final FiniteDuration timeout; + try { + timeout = AkkaUtils.getTimeout(configuration); + } catch (Exception e) { + throw new IllegalArgumentException( + "Invalid format for '" + ConfigConstants.AKKA_ASK_TIMEOUT + + "'.Use formats like '50 s' or '1 min' to specify the timeout."); + } + LOG.info("Messages between TaskManager and JobManager have a max timeout of " + timeout); + + final long cleanupInterval = configuration.getLong( + ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL, + ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000; + + final FiniteDuration finiteRegistrationDuration; + try { + Duration maxRegistrationDuration = Duration.create(configuration.getString( + ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, + ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION)); + if (maxRegistrationDuration.isFinite()) { + finiteRegistrationDuration = new FiniteDuration(maxRegistrationDuration.toSeconds(), TimeUnit.SECONDS); + } else { + finiteRegistrationDuration = null; + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid format for parameter " + + ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, e); + } + + final FiniteDuration initialRegistrationPause; + try { + Duration pause = Duration.create(configuration.getString( + ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, + ConfigConstants.DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE)); + if (pause.isFinite()) { + initialRegistrationPause = new FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS); + } else { + throw new IllegalArgumentException("The initial registration pause must be finite: " + pause); + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid format for parameter " + + ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e); + } + + final FiniteDuration maxRegistrationPause; + try { + Duration pause = Duration.create(configuration.getString( + ConfigConstants.TASK_MANAGER_MAX_REGISTARTION_PAUSE, + ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE)); + if (pause.isFinite()) { + maxRegistrationPause = new FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS); + } else { + throw new IllegalArgumentException("The maximum registration pause must be finite: " + pause); + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid format for parameter " + + ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e); + } + + final FiniteDuration refusedRegistrationPause; + try { + Duration pause = Duration.create(configuration.getString( + ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE, + ConfigConstants.DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE)); + if (pause.isFinite()) { + refusedRegistrationPause = new FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS); + } else { + throw new IllegalArgumentException("The refused registration pause must be finite: " + pause); + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid format for parameter " + + ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e); + } + + return new TaskExecutorConfiguration( + tmpDirs, + cleanupInterval, + networkConfig, + timeout, + finiteRegistrationDuration, + slots, + configuration, + initialRegistrationPause, + maxRegistrationPause, + refusedRegistrationPause); + } + + /** + * Validates a condition for a config parameter and displays a standard exception, if the + * the condition does not hold. + * + * @param condition The condition that must hold. If the condition is false, an exception is thrown. + * @param parameter The parameter value. Will be shown in the exception message. + * @param name The name of the config parameter. Will be shown in the exception message. + * @param errorMessage The optional custom error message to append to the exception message. + */ + private static void checkConfigParameter( + boolean condition, + Object parameter, + String name, + String errorMessage) { + if (!condition) { + throw new IllegalConfigurationException("Invalid configuration value for " + name + " : " + parameter + " - " + errorMessage); + } + } + + /** + * Validates that all the directories denoted by the strings do actually exist, are proper + * directories (not files), and are writable. + * + * @param tmpDirs The array of directory paths to check. + * @throws Exception Thrown if any of the directories does not exist or is not writable + * or is a file, rather than a directory. + */ + private static void checkTempDirs(String[] tmpDirs) throws IOException { + for (String dir : tmpDirs) { + if (dir != null && !dir.equals("")) { + File file = new File(dir); + if (!file.exists()) { + throw new IOException("Temporary file directory " + file.getAbsolutePath() + " does not exist."); + } + if (!file.isDirectory()) { + throw new IOException("Temporary file directory " + file.getAbsolutePath() + " is not a directory."); + } + if (!file.canWrite()) { + throw new IOException("Temporary file directory " + file.getAbsolutePath() + " is not writable."); + } + + if (LOG.isInfoEnabled()) { + long totalSpaceGb = file.getTotalSpace() >> 30; + long usableSpaceGb = file.getUsableSpace() >> 30; + double usablePercentage = (double)usableSpaceGb / totalSpaceGb * 100; + String path = file.getAbsolutePath(); + LOG.info(String.format("Temporary file directory '%s': total %d GB, " + "usable %d GB (%.2f%% usable)", + path, totalSpaceGb, usableSpaceGb, usablePercentage)); + } + } else { + throw new IllegalArgumentException("Temporary file directory #$id is null."); + } + } + } + + private static class TaskManagerComponents { + private final TaskManagerLocation taskManagerLocation; + private final MemoryManager memoryManager; + private final IOManager ioManager; + private final NetworkEnvironment networkEnvironment; + + private TaskManagerComponents( + TaskManagerLocation taskManagerLocation, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment) { + + this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation); + this.memoryManager = Preconditions.checkNotNull(memoryManager); + this.ioManager = Preconditions.checkNotNull(ioManager); + this.networkEnvironment = Preconditions.checkNotNull(networkEnvironment); + } + + public MemoryManager getMemoryManager() { + return memoryManager; + } + + public IOManager getIOManager() { + return ioManager; + } + + public NetworkEnvironment getNetworkEnvironment() { + return networkEnvironment; + } + + public TaskManagerLocation getTaskManagerLocation() { + return taskManagerLocation; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c34f13c2/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index 09aab18..26218dd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -19,17 +19,22 @@ package org.apache.flink.runtime.taskexecutor; import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.highavailability.NonHaServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; +import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.rpc.TestingSerialRpcService; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.TestLogger; import org.junit.Test; +import org.powermock.api.mockito.PowerMockito; import java.util.UUID; import static org.junit.Assert.*; @@ -42,19 +47,31 @@ public class TaskExecutorTest extends TestLogger { final ResourceID resourceID = ResourceID.generate(); final String resourceManagerAddress = "/resource/manager/address/one"; - final TestingRpcService rpc = new TestingRpcService(); + final TestingSerialRpcService rpc = new TestingSerialRpcService(); try { // register a mock resource manager gateway ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class); + TaskExecutorConfiguration taskExecutorConfiguration = mock(TaskExecutorConfiguration.class); + PowerMockito.when(taskExecutorConfiguration.getNumberOfSlots()).thenReturn(1); rpc.registerGateway(resourceManagerAddress, rmGateway); + TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class); + when(taskManagerLocation.getResourceID()).thenReturn(resourceID); + NonHaServices haServices = new NonHaServices(resourceManagerAddress); - TaskExecutor taskManager = TaskExecutor.startTaskManagerComponentsAndActor( - new Configuration(), resourceID, rpc, "localhost", haServices, true); - String taskManagerAddress = taskManager.getAddress(); + + TaskExecutor taskManager = new TaskExecutor( + taskExecutorConfiguration, + taskManagerLocation, + rpc, mock(MemoryManager.class), + mock(IOManager.class), + mock(NetworkEnvironment.class), + haServices); + taskManager.start(); + String taskManagerAddress = taskManager.getAddress(); - verify(rmGateway, timeout(5000)).registerTaskExecutor( + verify(rmGateway).registerTaskExecutor( any(UUID.class), eq(taskManagerAddress), eq(resourceID), any(Time.class)); } finally { @@ -71,7 +88,7 @@ public class TaskExecutorTest extends TestLogger { final UUID leaderId1 = UUID.randomUUID(); final UUID leaderId2 = UUID.randomUUID(); - final TestingRpcService rpc = new TestingRpcService(); + final TestingSerialRpcService rpc = new TestingSerialRpcService(); try { // register the mock resource manager gateways ResourceManagerGateway rmGateway1 = mock(ResourceManagerGateway.class); @@ -84,10 +101,22 @@ public class TaskExecutorTest extends TestLogger { TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); haServices.setResourceManagerLeaderRetriever(testLeaderService); - TaskExecutor taskManager = TaskExecutor.startTaskManagerComponentsAndActor( - new Configuration(), resourceID, rpc, "localhost", haServices, true); - String taskManagerAddress = taskManager.getAddress(); + TaskExecutorConfiguration taskExecutorConfiguration = mock(TaskExecutorConfiguration.class); + PowerMockito.when(taskExecutorConfiguration.getNumberOfSlots()).thenReturn(1); + + TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class); + when(taskManagerLocation.getResourceID()).thenReturn(resourceID); + + TaskExecutor taskManager = new TaskExecutor( + taskExecutorConfiguration, + taskManagerLocation, + rpc, mock(MemoryManager.class), + mock(IOManager.class), + mock(NetworkEnvironment.class), + haServices); + taskManager.start(); + String taskManagerAddress = taskManager.getAddress(); // no connection initially, since there is no leader assertNull(taskManager.getResourceManagerConnection()); @@ -95,7 +124,7 @@ public class TaskExecutorTest extends TestLogger { // define a leader and see that a registration happens testLeaderService.notifyListener(address1, leaderId1); - verify(rmGateway1, timeout(5000)).registerTaskExecutor( + verify(rmGateway1).registerTaskExecutor( eq(leaderId1), eq(taskManagerAddress), eq(resourceID), any(Time.class)); assertNotNull(taskManager.getResourceManagerConnection()); @@ -105,7 +134,7 @@ public class TaskExecutorTest extends TestLogger { // set a new leader, see that a registration happens testLeaderService.notifyListener(address2, leaderId2); - verify(rmGateway2, timeout(5000)).registerTaskExecutor( + verify(rmGateway2).registerTaskExecutor( eq(leaderId2), eq(taskManagerAddress), eq(resourceID), any(Time.class)); assertNotNull(taskManager.getResourceManagerConnection()); }