http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java new file mode 100644 index 0000000..4871b96 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -0,0 +1,827 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import akka.actor.ActorSystem; +import akka.dispatch.ExecutionContexts$; +import akka.util.Timeout; +import com.typesafe.config.Config; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.memory.HeapMemorySegment; +import org.apache.flink.core.memory.HybridMemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.core.memory.MemoryType; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.instance.InstanceConnectionInfo; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.io.network.NetworkEnvironment; +import org.apache.flink.runtime.io.network.netty.NettyConfig; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.rpc.RpcEndpoint; +import org.apache.flink.runtime.rpc.RpcMethod; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.taskmanager.MemoryLogger; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.NetUtils; + +import scala.Tuple2; +import scala.Option; +import scala.Some; +import scala.concurrent.ExecutionContext; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.UUID; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * TaskExecutor implementation. The task executor is responsible for the execution of multiple + * {@link org.apache.flink.runtime.taskmanager.Task}. + */ +public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { + + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); + + /** The unique resource ID of this TaskExecutor */ + private final ResourceID resourceID; + + /** The access to the leader election and metadata storage services */ + private final HighAvailabilityServices haServices; + + /** The task manager configuration */ + private final TaskExecutorConfiguration taskExecutorConfig; + + /** The I/O manager component in the task manager */ + private final IOManager ioManager; + + /** The memory manager component in the task manager */ + private final MemoryManager memoryManager; + + /** The network component in the task manager */ + private final NetworkEnvironment networkEnvironment; + + /** The number of slots in the task manager, should be 1 for YARN */ + private final int numberOfSlots; + + // --------- resource manager -------- + + private TaskExecutorToResourceManagerConnection resourceManagerConnection; + + // ------------------------------------------------------------------------ + + public TaskExecutor( + TaskExecutorConfiguration taskExecutorConfig, + ResourceID resourceID, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment, + int numberOfSlots, + RpcService rpcService, + HighAvailabilityServices haServices) { + + super(rpcService); + + this.taskExecutorConfig = checkNotNull(taskExecutorConfig); + this.resourceID = checkNotNull(resourceID); + this.memoryManager = checkNotNull(memoryManager); + this.ioManager = checkNotNull(ioManager); + this.networkEnvironment = checkNotNull(networkEnvironment); + this.numberOfSlots = checkNotNull(numberOfSlots); + this.haServices = checkNotNull(haServices); + } + + // ------------------------------------------------------------------------ + // Life cycle + // ------------------------------------------------------------------------ + + @Override + public void start() { + super.start(); + + // start by connecting to the ResourceManager + try { + haServices.getResourceManagerLeaderRetriever().start(new ResourceManagerLeaderListener()); + } catch (Exception e) { + onFatalErrorAsync(e); + } + } + + // ------------------------------------------------------------------------ + // RPC methods - ResourceManager related + // ------------------------------------------------------------------------ + + @RpcMethod + public void notifyOfNewResourceManagerLeader(String newLeaderAddress, UUID newLeaderId) { + if (resourceManagerConnection != null) { + if (newLeaderAddress != null) { + // the resource manager switched to a new leader + log.info("ResourceManager leader changed from {} to {}. Registering at new leader.", + resourceManagerConnection.getResourceManagerAddress(), newLeaderAddress); + } + else { + // address null means that the current leader is lost without a new leader being there, yet + log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.", + resourceManagerConnection.getResourceManagerAddress()); + } + + // drop the current connection or connection attempt + if (resourceManagerConnection != null) { + resourceManagerConnection.close(); + resourceManagerConnection = null; + } + } + + // establish a connection to the new leader + if (newLeaderAddress != null) { + log.info("Attempting to register at ResourceManager {}", newLeaderAddress); + resourceManagerConnection = + new TaskExecutorToResourceManagerConnection(log, this, newLeaderAddress, newLeaderId); + resourceManagerConnection.start(); + } + } + + /** + * Starts and runs the TaskManager. + * <p/> + * This method first tries to select the network interface to use for the TaskManager + * communication. The network interface is used both for the actor communication + * (coordination) as well as for the data exchange between task managers. Unless + * the hostname/interface is explicitly configured in the configuration, this + * method will try out various interfaces and methods to connect to the JobManager + * and select the one where the connection attempt is successful. + * <p/> + * After selecting the network interface, this method brings up an actor system + * for the TaskManager and its actors, starts the TaskManager's services + * (library cache, shuffle network stack, ...), and starts the TaskManager itself. + * + * @param configuration The configuration for the TaskManager. + * @param resourceID The id of the resource which the task manager will run on. + */ + public static void selectNetworkInterfaceAndRunTaskManager( + Configuration configuration, + ResourceID resourceID) throws Exception { + + final InetSocketAddress taskManagerAddress = selectNetworkInterfaceAndPort(configuration); + + runTaskManager(taskManagerAddress.getHostName(), resourceID, taskManagerAddress.getPort(), configuration); + } + + private static InetSocketAddress selectNetworkInterfaceAndPort(Configuration configuration) + throws Exception { + String taskManagerHostname = configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null); + if (taskManagerHostname != null) { + LOG.info("Using configured hostname/address for TaskManager: " + taskManagerHostname); + } else { + LeaderRetrievalService leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration); + FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(configuration); + + InetAddress taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, lookupTimeout); + taskManagerHostname = taskManagerAddress.getHostName(); + LOG.info("TaskManager will use hostname/address '{}' ({}) for communication.", + taskManagerHostname, taskManagerAddress.getHostAddress()); + } + + // if no task manager port has been configured, use 0 (system will pick any free port) + final int actorSystemPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0); + if (actorSystemPort < 0 || actorSystemPort > 65535) { + throw new IllegalConfigurationException("Invalid value for '" + + ConfigConstants.TASK_MANAGER_IPC_PORT_KEY + + "' (port for the TaskManager actor system) : " + actorSystemPort + + " - Leave config parameter empty or use 0 to let the system choose a port automatically."); + } + + return new InetSocketAddress(taskManagerHostname, actorSystemPort); + } + + /** + * Starts and runs the TaskManager. Brings up an actor system for the TaskManager and its + * actors, starts the TaskManager's services (library cache, shuffle network stack, ...), + * and starts the TaskManager itself. + * <p/> + * This method will also spawn a process reaper for the TaskManager (kill the process if + * the actor fails) and optionally start the JVM memory logging thread. + * + * @param taskManagerHostname The hostname/address of the interface where the actor system + * will communicate. + * @param resourceID The id of the resource which the task manager will run on. + * @param actorSystemPort The port at which the actor system will communicate. + * @param configuration The configuration for the TaskManager. + */ + private static void runTaskManager( + String taskManagerHostname, + ResourceID resourceID, + int actorSystemPort, + final Configuration configuration) throws Exception { + + LOG.info("Starting TaskManager"); + + // Bring up the TaskManager actor system first, bind it to the given address. + + LOG.info("Starting TaskManager actor system at " + + NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort)); + + final ActorSystem taskManagerSystem; + try { + Tuple2<String, Object> address = new Tuple2<String, Object>(taskManagerHostname, actorSystemPort); + Config akkaConfig = AkkaUtils.getAkkaConfig(configuration, new Some<>(address)); + LOG.debug("Using akka configuration\n " + akkaConfig); + taskManagerSystem = AkkaUtils.createActorSystem(akkaConfig); + } catch (Throwable t) { + if (t instanceof org.jboss.netty.channel.ChannelException) { + Throwable cause = t.getCause(); + if (cause != null && t.getCause() instanceof java.net.BindException) { + String address = NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort); + throw new IOException("Unable to bind TaskManager actor system to address " + + address + " - " + cause.getMessage(), t); + } + } + throw new Exception("Could not create TaskManager actor system", t); + } + + // start akka rpc service based on actor system + final Timeout timeout = new Timeout(AkkaUtils.getTimeout(configuration).toMillis(), TimeUnit.MILLISECONDS); + final AkkaRpcService akkaRpcService = new AkkaRpcService(taskManagerSystem, timeout); + + // start high availability service to implement getResourceManagerLeaderRetriever method only + final HighAvailabilityServices haServices = new HighAvailabilityServices() { + @Override + public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception { + return LeaderRetrievalUtils.createLeaderRetrievalService(configuration); + } + + @Override + public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception { + return null; + } + + @Override + public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception { + return null; + } + }; + + // start all the TaskManager services (network stack, library cache, ...) + // and the TaskManager actor + try { + LOG.info("Starting TaskManager actor"); + TaskExecutor taskExecutor = startTaskManagerComponentsAndActor( + configuration, + resourceID, + akkaRpcService, + taskManagerHostname, + haServices, + false); + + taskExecutor.start(); + + // if desired, start the logging daemon that periodically logs the memory usage information + if (LOG.isInfoEnabled() && configuration.getBoolean( + ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD, + ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD)) { + LOG.info("Starting periodic memory usage logger"); + + long interval = configuration.getLong( + ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS, + ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS); + + MemoryLogger logger = new MemoryLogger(LOG, interval, taskManagerSystem); + logger.start(); + } + + // block until everything is done + taskManagerSystem.awaitTermination(); + } catch (Throwable t) { + LOG.error("Error while starting up taskManager", t); + try { + taskManagerSystem.shutdown(); + } catch (Throwable tt) { + LOG.warn("Could not cleanly shut down actor system", tt); + } + throw t; + } + } + + // -------------------------------------------------------------------------- + // Starting and running the TaskManager + // -------------------------------------------------------------------------- + + /** + * @param configuration The configuration for the TaskManager. + * @param resourceID The id of the resource which the task manager will run on. + * @param rpcService The rpc service which is used to start and connect to the TaskManager RpcEndpoint . + * @param taskManagerHostname The hostname/address that describes the TaskManager's data location. + * @param haServices Optionally, a high availability service can be provided. If none is given, + * then a HighAvailabilityServices is constructed from the configuration. + * @param localTaskManagerCommunication If true, the TaskManager will not initiate the TCP network stack. + * @return An ActorRef to the TaskManager actor. + * @throws org.apache.flink.configuration.IllegalConfigurationException Thrown, if the given config contains illegal values. + * @throws java.io.IOException Thrown, if any of the I/O components (such as buffer pools, + * I/O manager, ...) cannot be properly started. + * @throws java.lang.Exception Thrown is some other error occurs while parsing the configuration + * or starting the TaskManager components. + */ + public static TaskExecutor startTaskManagerComponentsAndActor( + Configuration configuration, + ResourceID resourceID, + RpcService rpcService, + String taskManagerHostname, + HighAvailabilityServices haServices, + boolean localTaskManagerCommunication) throws Exception { + + final TaskExecutorConfiguration taskExecutorConfig = parseTaskManagerConfiguration( + configuration, taskManagerHostname, localTaskManagerCommunication); + + MemoryType memType = taskExecutorConfig.getNetworkConfig().memoryType(); + + // pre-start checks + checkTempDirs(taskExecutorConfig.getTmpDirPaths()); + + ExecutionContext executionContext = ExecutionContexts$.MODULE$.fromExecutor(new ForkJoinPool()); + + // we start the network first, to make sure it can allocate its buffers first + final NetworkEnvironment network = new NetworkEnvironment( + executionContext, + taskExecutorConfig.getTimeout(), + taskExecutorConfig.getNetworkConfig(), + taskExecutorConfig.getConnectionInfo()); + + // computing the amount of memory to use depends on how much memory is available + // it strictly needs to happen AFTER the network stack has been initialized + + // check if a value has been configured + long configuredMemory = configuration.getLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L); + checkConfigParameter(configuredMemory == -1 || configuredMemory > 0, configuredMemory, + ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, + "MemoryManager needs at least one MB of memory. " + + "If you leave this config parameter empty, the system automatically " + + "pick a fraction of the available memory."); + + final long memorySize; + boolean preAllocateMemory = configuration.getBoolean( + ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE); + if (configuredMemory > 0) { + if (preAllocateMemory) { + LOG.info("Using {} MB for managed memory." , configuredMemory); + } else { + LOG.info("Limiting managed memory to {} MB, memory will be allocated lazily." , configuredMemory); + } + memorySize = configuredMemory << 20; // megabytes to bytes + } else { + float fraction = configuration.getFloat( + ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, + ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION); + checkConfigParameter(fraction > 0.0f && fraction < 1.0f, fraction, + ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, + "MemoryManager fraction of the free memory must be between 0.0 and 1.0"); + + if (memType == MemoryType.HEAP) { + long relativeMemSize = (long) (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * fraction); + if (preAllocateMemory) { + LOG.info("Using {} of the currently free heap space for managed heap memory ({} MB)." , + fraction , relativeMemSize >> 20); + } else { + LOG.info("Limiting managed memory to {} of the currently free heap space ({} MB), " + + "memory will be allocated lazily." , fraction , relativeMemSize >> 20); + } + memorySize = relativeMemSize; + } else if (memType == MemoryType.OFF_HEAP) { + // The maximum heap memory has been adjusted according to the fraction + long maxMemory = EnvironmentInformation.getMaxJvmHeapMemory(); + long directMemorySize = (long) (maxMemory / (1.0 - fraction) * fraction); + if (preAllocateMemory) { + LOG.info("Using {} of the maximum memory size for managed off-heap memory ({} MB)." , + fraction, directMemorySize >> 20); + } else { + LOG.info("Limiting managed memory to {} of the maximum memory size ({} MB)," + + " memory will be allocated lazily.", fraction, directMemorySize >> 20); + } + memorySize = directMemorySize; + } else { + throw new RuntimeException("No supported memory type detected."); + } + } + + // now start the memory manager + final MemoryManager memoryManager; + try { + memoryManager = new MemoryManager( + memorySize, + taskExecutorConfig.getNumberOfSlots(), + taskExecutorConfig.getNetworkConfig().networkBufferSize(), + memType, + preAllocateMemory); + } catch (OutOfMemoryError e) { + if (memType == MemoryType.HEAP) { + throw new Exception("OutOfMemory error (" + e.getMessage() + + ") while allocating the TaskManager heap memory (" + memorySize + " bytes).", e); + } else if (memType == MemoryType.OFF_HEAP) { + throw new Exception("OutOfMemory error (" + e.getMessage() + + ") while allocating the TaskManager off-heap memory (" + memorySize + + " bytes).Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e); + } else { + throw e; + } + } + + // start the I/O manager, it will create some temp directories. + final IOManager ioManager = new IOManagerAsync(taskExecutorConfig.getTmpDirPaths()); + + final TaskExecutor taskExecutor = new TaskExecutor( + taskExecutorConfig, + resourceID, + memoryManager, + ioManager, + network, + taskExecutorConfig.getNumberOfSlots(), + rpcService, + haServices); + + return taskExecutor; + } + + // -------------------------------------------------------------------------- + // Parsing and checking the TaskManager Configuration + // -------------------------------------------------------------------------- + + /** + * Utility method to extract TaskManager config parameters from the configuration and to + * sanity check them. + * + * @param configuration The configuration. + * @param taskManagerHostname The host name under which the TaskManager communicates. + * @param localTaskManagerCommunication True, to skip initializing the network stack. + * Use only in cases where only one task manager runs. + * @return TaskExecutorConfiguration that wrappers InstanceConnectionInfo, NetworkEnvironmentConfiguration, etc. + */ + private static TaskExecutorConfiguration parseTaskManagerConfiguration( + Configuration configuration, + String taskManagerHostname, + boolean localTaskManagerCommunication) throws Exception { + + // ------- read values from the config and check them --------- + // (a lot of them) + + // ----> hosts / ports for communication and data exchange + + int dataport = configuration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT); + if (dataport == 0) { + dataport = NetUtils.getAvailablePort(); + } + checkConfigParameter(dataport > 0, dataport, ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, + "Leave config parameter empty or use 0 to let the system choose a port automatically."); + + InetAddress taskManagerAddress = InetAddress.getByName(taskManagerHostname); + final InstanceConnectionInfo connectionInfo = new InstanceConnectionInfo(taskManagerAddress, dataport); + + // ----> memory / network stack (shuffles/broadcasts), task slots, temp directories + + // we need this because many configs have been written with a "-1" entry + int slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); + if (slots == -1) { + slots = 1; + } + checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, + "Number of task slots must be at least one."); + + final int numNetworkBuffers = configuration.getInteger( + ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS); + checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers, + ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, ""); + + final int pageSize = configuration.getInteger( + ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE); + // check page size of for minimum size + checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize, + ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, + "Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE); + // check page size for power of two + checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize, + ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, + "Memory segment size must be a power of 2."); + + // check whether we use heap or off-heap memory + final MemoryType memType; + if (configuration.getBoolean(ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false)) { + memType = MemoryType.OFF_HEAP; + } else { + memType = MemoryType.HEAP; + } + + // initialize the memory segment factory accordingly + if (memType == MemoryType.HEAP) { + if (!MemorySegmentFactory.initializeIfNotInitialized(HeapMemorySegment.FACTORY)) { + throw new Exception("Memory type is set to heap memory, but memory segment " + + "factory has been initialized for off-heap memory segments"); + } + } else { + if (!MemorySegmentFactory.initializeIfNotInitialized(HybridMemorySegment.FACTORY)) { + throw new Exception("Memory type is set to off-heap memory, but memory segment " + + "factory has been initialized for heap memory segments"); + } + } + + final String[] tmpDirs = configuration.getString( + ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator); + + final NettyConfig nettyConfig; + if (!localTaskManagerCommunication) { + nettyConfig = new NettyConfig(connectionInfo.address(), connectionInfo.dataPort(), pageSize, slots, configuration); + } else { + nettyConfig = null; + } + + // Default spill I/O mode for intermediate results + final String syncOrAsync = configuration.getString( + ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE, + ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE); + + final IOMode ioMode; + if (syncOrAsync.equals("async")) { + ioMode = IOManager.IOMode.ASYNC; + } else { + ioMode = IOManager.IOMode.SYNC; + } + + final int queryServerPort = configuration.getInteger( + ConfigConstants.QUERYABLE_STATE_SERVER_PORT, + ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_PORT); + + final int queryServerNetworkThreads = configuration.getInteger( + ConfigConstants.QUERYABLE_STATE_SERVER_NETWORK_THREADS, + ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_NETWORK_THREADS); + + final int queryServerQueryThreads = configuration.getInteger( + ConfigConstants.QUERYABLE_STATE_SERVER_QUERY_THREADS, + ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_QUERY_THREADS); + + final NetworkEnvironmentConfiguration networkConfig = new NetworkEnvironmentConfiguration( + numNetworkBuffers, + pageSize, + memType, + ioMode, + queryServerPort, + queryServerNetworkThreads, + queryServerQueryThreads, + localTaskManagerCommunication ? Option.<NettyConfig>empty() : new Some<>(nettyConfig), + new Tuple2<>(500, 3000)); + + // ----> timeouts, library caching, profiling + + final FiniteDuration timeout; + try { + timeout = AkkaUtils.getTimeout(configuration); + } catch (Exception e) { + throw new IllegalArgumentException( + "Invalid format for '" + ConfigConstants.AKKA_ASK_TIMEOUT + + "'.Use formats like '50 s' or '1 min' to specify the timeout."); + } + LOG.info("Messages between TaskManager and JobManager have a max timeout of " + timeout); + + final long cleanupInterval = configuration.getLong( + ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL, + ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000; + + final FiniteDuration finiteRegistrationDuration; + try { + Duration maxRegistrationDuration = Duration.create(configuration.getString( + ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, + ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION)); + if (maxRegistrationDuration.isFinite()) { + finiteRegistrationDuration = new FiniteDuration(maxRegistrationDuration.toSeconds(), TimeUnit.SECONDS); + } else { + finiteRegistrationDuration = null; + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid format for parameter " + + ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, e); + } + + final FiniteDuration initialRegistrationPause; + try { + Duration pause = Duration.create(configuration.getString( + ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, + ConfigConstants.DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE)); + if (pause.isFinite()) { + initialRegistrationPause = new FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS); + } else { + throw new IllegalArgumentException("The initial registration pause must be finite: " + pause); + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid format for parameter " + + ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e); + } + + final FiniteDuration maxRegistrationPause; + try { + Duration pause = Duration.create(configuration.getString( + ConfigConstants.TASK_MANAGER_MAX_REGISTARTION_PAUSE, + ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE)); + if (pause.isFinite()) { + maxRegistrationPause = new FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS); + } else { + throw new IllegalArgumentException("The maximum registration pause must be finite: " + pause); + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid format for parameter " + + ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e); + } + + final FiniteDuration refusedRegistrationPause; + try { + Duration pause = Duration.create(configuration.getString( + ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE, + ConfigConstants.DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE)); + if (pause.isFinite()) { + refusedRegistrationPause = new FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS); + } else { + throw new IllegalArgumentException("The refused registration pause must be finite: " + pause); + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid format for parameter " + + ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e); + } + + return new TaskExecutorConfiguration( + tmpDirs, + cleanupInterval, + connectionInfo, + networkConfig, + timeout, + finiteRegistrationDuration, + slots, + configuration, + initialRegistrationPause, + maxRegistrationPause, + refusedRegistrationPause); + } + + /** + * Validates a condition for a config parameter and displays a standard exception, if the + * the condition does not hold. + * + * @param condition The condition that must hold. If the condition is false, an exception is thrown. + * @param parameter The parameter value. Will be shown in the exception message. + * @param name The name of the config parameter. Will be shown in the exception message. + * @param errorMessage The optional custom error message to append to the exception message. + */ + private static void checkConfigParameter( + boolean condition, + Object parameter, + String name, + String errorMessage) { + if (!condition) { + throw new IllegalConfigurationException("Invalid configuration value for " + name + " : " + parameter + " - " + errorMessage); + } + } + + /** + * Validates that all the directories denoted by the strings do actually exist, are proper + * directories (not files), and are writable. + * + * @param tmpDirs The array of directory paths to check. + * @throws Exception Thrown if any of the directories does not exist or is not writable + * or is a file, rather than a directory. + */ + private static void checkTempDirs(String[] tmpDirs) throws IOException { + for (String dir : tmpDirs) { + if (dir != null && !dir.equals("")) { + File file = new File(dir); + if (!file.exists()) { + throw new IOException("Temporary file directory " + file.getAbsolutePath() + " does not exist."); + } + if (!file.isDirectory()) { + throw new IOException("Temporary file directory " + file.getAbsolutePath() + " is not a directory."); + } + if (!file.canWrite()) { + throw new IOException("Temporary file directory " + file.getAbsolutePath() + " is not writable."); + } + + if (LOG.isInfoEnabled()) { + long totalSpaceGb = file.getTotalSpace() >> 30; + long usableSpaceGb = file.getUsableSpace() >> 30; + double usablePercentage = (double)usableSpaceGb / totalSpaceGb * 100; + String path = file.getAbsolutePath(); + LOG.info(String.format("Temporary file directory '%s': total %d GB, " + "usable %d GB (%.2f%% usable)", + path, totalSpaceGb, usableSpaceGb, usablePercentage)); + } + } else { + throw new IllegalArgumentException("Temporary file directory #$id is null."); + } + } + } + + // ------------------------------------------------------------------------ + // Properties + // ------------------------------------------------------------------------ + + public ResourceID getResourceID() { + return resourceID; + } + + // ------------------------------------------------------------------------ + // Error Handling + // ------------------------------------------------------------------------ + + /** + * Notifies the TaskExecutor that a fatal error has occurred and it cannot proceed. + * This method should be used when asynchronous threads want to notify the + * TaskExecutor of a fatal error. + * + * @param t The exception describing the fatal error + */ + void onFatalErrorAsync(final Throwable t) { + runAsync(new Runnable() { + @Override + public void run() { + onFatalError(t); + } + }); + } + + /** + * Notifies the TaskExecutor that a fatal error has occurred and it cannot proceed. + * This method must only be called from within the TaskExecutor's main thread. + * + * @param t The exception describing the fatal error + */ + void onFatalError(Throwable t) { + // to be determined, probably delegate to a fatal error handler that + // would either log (mini cluster) ot kill the process (yarn, mesos, ...) + log.error("FATAL ERROR", t); + } + + // ------------------------------------------------------------------------ + // Access to fields for testing + // ------------------------------------------------------------------------ + + @VisibleForTesting + TaskExecutorToResourceManagerConnection getResourceManagerConnection() { + return resourceManagerConnection; + } + + // ------------------------------------------------------------------------ + // Utility classes + // ------------------------------------------------------------------------ + + /** + * The listener for leader changes of the resource manager + */ + private class ResourceManagerLeaderListener implements LeaderRetrievalListener { + + @Override + public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) { + getSelf().notifyOfNewResourceManagerLeader(leaderAddress, leaderSessionID); + } + + @Override + public void handleError(Exception exception) { + onFatalErrorAsync(exception); + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java new file mode 100644 index 0000000..3707a47 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.instance.InstanceConnectionInfo; +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; + +import scala.concurrent.duration.FiniteDuration; + +import java.io.Serializable; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link TaskExecutor} Configuration + */ +public class TaskExecutorConfiguration implements Serializable { + + private static final long serialVersionUID = 1L; + + private final String[] tmpDirPaths; + + private final long cleanupInterval; + + private final int numberOfSlots; + + private final Configuration configuration; + + private final FiniteDuration timeout; + private final FiniteDuration maxRegistrationDuration; + private final FiniteDuration initialRegistrationPause; + private final FiniteDuration maxRegistrationPause; + private final FiniteDuration refusedRegistrationPause; + + private final NetworkEnvironmentConfiguration networkConfig; + + private final InstanceConnectionInfo connectionInfo; + + public TaskExecutorConfiguration( + String[] tmpDirPaths, + long cleanupInterval, + InstanceConnectionInfo connectionInfo, + NetworkEnvironmentConfiguration networkConfig, + FiniteDuration timeout, + FiniteDuration maxRegistrationDuration, + int numberOfSlots, + Configuration configuration) { + + this (tmpDirPaths, + cleanupInterval, + connectionInfo, + networkConfig, + timeout, + maxRegistrationDuration, + numberOfSlots, + configuration, + new FiniteDuration(500, TimeUnit.MILLISECONDS), + new FiniteDuration(30, TimeUnit.SECONDS), + new FiniteDuration(10, TimeUnit.SECONDS)); + } + + public TaskExecutorConfiguration( + String[] tmpDirPaths, + long cleanupInterval, + InstanceConnectionInfo connectionInfo, + NetworkEnvironmentConfiguration networkConfig, + FiniteDuration timeout, + FiniteDuration maxRegistrationDuration, + int numberOfSlots, + Configuration configuration, + FiniteDuration initialRegistrationPause, + FiniteDuration maxRegistrationPause, + FiniteDuration refusedRegistrationPause) { + + this.tmpDirPaths = checkNotNull(tmpDirPaths); + this.cleanupInterval = checkNotNull(cleanupInterval); + this.connectionInfo = checkNotNull(connectionInfo); + this.networkConfig = checkNotNull(networkConfig); + this.timeout = checkNotNull(timeout); + this.maxRegistrationDuration = maxRegistrationDuration; + this.numberOfSlots = checkNotNull(numberOfSlots); + this.configuration = checkNotNull(configuration); + this.initialRegistrationPause = checkNotNull(initialRegistrationPause); + this.maxRegistrationPause = checkNotNull(maxRegistrationPause); + this.refusedRegistrationPause = checkNotNull(refusedRegistrationPause); + } + + // -------------------------------------------------------------------------------------------- + // Properties + // -------------------------------------------------------------------------------------------- + + public String[] getTmpDirPaths() { + return tmpDirPaths; + } + + public long getCleanupInterval() { + return cleanupInterval; + } + + public InstanceConnectionInfo getConnectionInfo() { return connectionInfo; } + + public NetworkEnvironmentConfiguration getNetworkConfig() { return networkConfig; } + + public FiniteDuration getTimeout() { + return timeout; + } + + public FiniteDuration getMaxRegistrationDuration() { + return maxRegistrationDuration; + } + + public int getNumberOfSlots() { + return numberOfSlots; + } + + public Configuration getConfiguration() { + return configuration; + } + + public FiniteDuration getInitialRegistrationPause() { + return initialRegistrationPause; + } + + public FiniteDuration getMaxRegistrationPause() { + return maxRegistrationPause; + } + + public FiniteDuration getRefusedRegistrationPause() { + return refusedRegistrationPause; + } + +} + http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java new file mode 100644 index 0000000..6c99706 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.runtime.rpc.RpcGateway; + +import java.util.UUID; + +/** + * {@link TaskExecutor} RPC gateway interface + */ +public interface TaskExecutorGateway extends RpcGateway { + + // ------------------------------------------------------------------------ + // ResourceManager handlers + // ------------------------------------------------------------------------ + + void notifyOfNewResourceManagerLeader(String address, UUID resourceManagerLeaderId); +} http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java new file mode 100644 index 0000000..b357f52 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.registration.RegistrationResponse; + +import java.io.Serializable; + +/** + * Base class for responses from the ResourceManager to a registration attempt by a + * TaskExecutor. + */ +public final class TaskExecutorRegistrationSuccess extends RegistrationResponse.Success implements Serializable { + + private static final long serialVersionUID = 1L; + + private final InstanceID registrationId; + + private final long heartbeatInterval; + + /** + * Create a new {@code TaskExecutorRegistrationSuccess} message. + * + * @param registrationId The ID that the ResourceManager assigned the registration. + * @param heartbeatInterval The interval in which the ResourceManager will heartbeat the TaskExecutor. + */ + public TaskExecutorRegistrationSuccess(InstanceID registrationId, long heartbeatInterval) { + this.registrationId = registrationId; + this.heartbeatInterval = heartbeatInterval; + } + + /** + * Gets the ID that the ResourceManager assigned the registration. + */ + public InstanceID getRegistrationId() { + return registrationId; + } + + /** + * Gets the interval in which the ResourceManager will heartbeat the TaskExecutor. + */ + public long getHeartbeatInterval() { + return heartbeatInterval; + } + + @Override + public String toString() { + return "TaskExecutorRegistrationSuccess (" + registrationId + " / " + heartbeatInterval + ')'; + } + +} + + + + + + + http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java new file mode 100644 index 0000000..25332a0 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import akka.dispatch.OnFailure; +import akka.dispatch.OnSuccess; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.registration.RetryingRegistration; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; + +import org.slf4j.Logger; + +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * The connection between a TaskExecutor and the ResourceManager. + */ +public class TaskExecutorToResourceManagerConnection { + + /** the logger for all log messages of this class */ + private final Logger log; + + /** the TaskExecutor whose connection to the ResourceManager this represents */ + private final TaskExecutor taskExecutor; + + private final UUID resourceManagerLeaderId; + + private final String resourceManagerAddress; + + private TaskExecutorToResourceManagerConnection.ResourceManagerRegistration pendingRegistration; + + private ResourceManagerGateway registeredResourceManager; + + private InstanceID registrationId; + + /** flag indicating that the connection is closed */ + private volatile boolean closed; + + + public TaskExecutorToResourceManagerConnection( + Logger log, + TaskExecutor taskExecutor, + String resourceManagerAddress, + UUID resourceManagerLeaderId) { + + this.log = checkNotNull(log); + this.taskExecutor = checkNotNull(taskExecutor); + this.resourceManagerAddress = checkNotNull(resourceManagerAddress); + this.resourceManagerLeaderId = checkNotNull(resourceManagerLeaderId); + } + + // ------------------------------------------------------------------------ + // Life cycle + // ------------------------------------------------------------------------ + + @SuppressWarnings("unchecked") + public void start() { + checkState(!closed, "The connection is already closed"); + checkState(!isRegistered() && pendingRegistration == null, "The connection is already started"); + + pendingRegistration = new TaskExecutorToResourceManagerConnection.ResourceManagerRegistration( + log, taskExecutor.getRpcService(), + resourceManagerAddress, resourceManagerLeaderId, + taskExecutor.getAddress(), taskExecutor.getResourceID()); + pendingRegistration.startRegistration(); + + Future<Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess>> future = pendingRegistration.getFuture(); + + future.onSuccess(new OnSuccess<Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess>>() { + @Override + public void onSuccess(Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess> result) { + registeredResourceManager = result.f0; + registrationId = result.f1.getRegistrationId(); + } + }, taskExecutor.getMainThreadExecutionContext()); + + // this future should only ever fail if there is a bug, not if the registration is declined + future.onFailure(new OnFailure() { + @Override + public void onFailure(Throwable failure) { + taskExecutor.onFatalError(failure); + } + }, taskExecutor.getMainThreadExecutionContext()); + } + + public void close() { + closed = true; + + // make sure we do not keep re-trying forever + if (pendingRegistration != null) { + pendingRegistration.cancel(); + } + } + + public boolean isClosed() { + return closed; + } + + // ------------------------------------------------------------------------ + // Properties + // ------------------------------------------------------------------------ + + public UUID getResourceManagerLeaderId() { + return resourceManagerLeaderId; + } + + public String getResourceManagerAddress() { + return resourceManagerAddress; + } + + /** + * Gets the ResourceManagerGateway. This returns null until the registration is completed. + */ + public ResourceManagerGateway getResourceManager() { + return registeredResourceManager; + } + + /** + * Gets the ID under which the TaskExecutor is registered at the ResourceManager. + * This returns null until the registration is completed. + */ + public InstanceID getRegistrationId() { + return registrationId; + } + + public boolean isRegistered() { + return registeredResourceManager != null; + } + + // ------------------------------------------------------------------------ + + @Override + public String toString() { + return String.format("Connection to ResourceManager %s (leaderId=%s)", + resourceManagerAddress, resourceManagerLeaderId); + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + private static class ResourceManagerRegistration + extends RetryingRegistration<ResourceManagerGateway, TaskExecutorRegistrationSuccess> { + + private final String taskExecutorAddress; + + private final ResourceID resourceID; + + ResourceManagerRegistration( + Logger log, + RpcService rpcService, + String targetAddress, + UUID leaderId, + String taskExecutorAddress, + ResourceID resourceID) { + + super(log, rpcService, "ResourceManager", ResourceManagerGateway.class, targetAddress, leaderId); + this.taskExecutorAddress = checkNotNull(taskExecutorAddress); + this.resourceID = checkNotNull(resourceID); + } + + @Override + protected Future<RegistrationResponse> invokeRegistration( + ResourceManagerGateway resourceManager, UUID leaderId, long timeoutMillis) throws Exception { + + FiniteDuration timeout = new FiniteDuration(timeoutMillis, TimeUnit.MILLISECONDS); + return resourceManager.registerTaskExecutor(leaderId, taskExecutorAddress, resourceID, timeout); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ClusterShutdownITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ClusterShutdownITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ClusterShutdownITCase.java new file mode 100644 index 0000000..744308c --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ClusterShutdownITCase.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework; + +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.messages.Messages; +import org.apache.flink.runtime.testingUtils.TestingMessages; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testutils.TestingResourceManager; +import org.apache.flink.util.TestLogger; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import scala.Option; + + +/** + * Runs tests to ensure that a cluster is shutdown properly. + */ +public class ClusterShutdownITCase extends TestLogger { + + private static ActorSystem system; + + private static Configuration config = new Configuration(); + + @BeforeClass + public static void setup() { + system = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig()); + } + + @AfterClass + public static void teardown() { + JavaTestKit.shutdownActorSystem(system); + } + + /** + * Tests a faked cluster shutdown procedure without the ResourceManager. + */ + @Test + public void testClusterShutdownWithoutResourceManager() { + + new JavaTestKit(system){{ + new Within(duration("30 seconds")) { + @Override + protected void run() { + + ActorGateway me = + TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty()); + + // start job manager which doesn't shutdown the actor system + ActorGateway jobManager = + TestingUtils.createJobManager(system, config, "jobmanager1"); + + // Tell the JobManager to inform us of shutdown actions + jobManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me); + + // Register a TaskManager + ActorGateway taskManager = + TestingUtils.createTaskManager(system, jobManager, config, true, true); + + // Tell the TaskManager to inform us of TaskManager shutdowns + taskManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me); + + + // No resource manager connected + jobManager.tell(new StopCluster(ApplicationStatus.SUCCEEDED, "Shutting down."), me); + + expectMsgAllOf( + new TestingMessages.ComponentShutdown(taskManager.actor()), + new TestingMessages.ComponentShutdown(jobManager.actor()), + StopClusterSuccessful.getInstance() + ); + + }}; + }}; + } + + /** + * Tests a faked cluster shutdown procedure with the ResourceManager. + */ + @Test + public void testClusterShutdownWithResourceManager() { + + new JavaTestKit(system){{ + new Within(duration("30 seconds")) { + @Override + protected void run() { + + ActorGateway me = + TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty()); + + // start job manager which doesn't shutdown the actor system + ActorGateway jobManager = + TestingUtils.createJobManager(system, config, "jobmanager2"); + + // Tell the JobManager to inform us of shutdown actions + jobManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me); + + // Register a TaskManager + ActorGateway taskManager = + TestingUtils.createTaskManager(system, jobManager, config, true, true); + + // Tell the TaskManager to inform us of TaskManager shutdowns + taskManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me); + + // Start resource manager and let it register + ActorGateway resourceManager = + TestingUtils.createResourceManager(system, jobManager.actor(), config); + + // Tell the ResourceManager to inform us of ResourceManager shutdowns + resourceManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me); + + // notify about a resource manager registration at the job manager + resourceManager.tell(new TestingResourceManager.NotifyWhenResourceManagerConnected(), me); + + // Wait for resource manager + expectMsgEquals(Messages.getAcknowledge()); + + + // Shutdown cluster with resource manager connected + jobManager.tell(new StopCluster(ApplicationStatus.SUCCEEDED, "Shutting down."), me); + + expectMsgAllOf( + new TestingMessages.ComponentShutdown(taskManager.actor()), + new TestingMessages.ComponentShutdown(jobManager.actor()), + new TestingMessages.ComponentShutdown(resourceManager.actor()), + StopClusterSuccessful.getInstance() + ); + + }}; + }}; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerITCase.java new file mode 100644 index 0000000..1565dc3 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerITCase.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework; + +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.HardwareDescription; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.messages.Messages; +import org.apache.flink.runtime.messages.RegistrationMessages; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testutils.TestingResourceManager; +import org.apache.flink.util.TestLogger; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import scala.Option; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * It cases which test the interaction of the resource manager with job manager and task managers. + * Runs all tests in one Actor system. + */ +public class ResourceManagerITCase extends TestLogger { + + private static ActorSystem system; + + private static Configuration config = new Configuration(); + + @BeforeClass + public static void setup() { + system = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig()); + } + + @AfterClass + public static void teardown() { + JavaTestKit.shutdownActorSystem(system); + } + + /** + * Tests whether the resource manager connects and reconciles existing task managers. + */ + @Test + public void testResourceManagerReconciliation() { + + new JavaTestKit(system){{ + new Within(duration("10 seconds")) { + @Override + protected void run() { + + ActorGateway jobManager = + TestingUtils.createJobManager(system, config, "ReconciliationTest"); + ActorGateway me = + TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty()); + + // !! no resource manager started !! + + ResourceID resourceID = ResourceID.generate(); + + TaskManagerLocation location = mock(TaskManagerLocation.class); + when(location.getResourceID()).thenReturn(resourceID); + + HardwareDescription resourceProfile = HardwareDescription.extractFromSystem(1_000_000); + + jobManager.tell( + new RegistrationMessages.RegisterTaskManager(resourceID, location, resourceProfile, 1), + me); + + expectMsgClass(RegistrationMessages.AcknowledgeRegistration.class); + + // now start the resource manager + ActorGateway resourceManager = + TestingUtils.createResourceManager(system, jobManager.actor(), config); + + // register at testing job manager to receive a message once a resource manager registers + resourceManager.tell(new TestingResourceManager.NotifyWhenResourceManagerConnected(), me); + + // Wait for resource manager + expectMsgEquals(Messages.getAcknowledge()); + + // check if we registered the task manager resource + resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), me); + + TestingResourceManager.GetRegisteredResourcesReply reply = + expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class); + + assertEquals(1, reply.resources.size()); + assertTrue(reply.resources.contains(resourceID)); + + }}; + }}; + } + + /** + * Tests whether the resource manager gets informed upon TaskManager registration. + */ + @Test + public void testResourceManagerTaskManagerRegistration() { + + new JavaTestKit(system){{ + new Within(duration("30 seconds")) { + @Override + protected void run() { + + ActorGateway jobManager = + TestingUtils.createJobManager(system, config, "RegTest"); + ActorGateway me = + TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty()); + + // start the resource manager + ActorGateway resourceManager = + TestingUtils.createResourceManager(system, jobManager.actor(), config); + + // notify about a resource manager registration at the job manager + resourceManager.tell(new TestingResourceManager.NotifyWhenResourceManagerConnected(), me); + + // Wait for resource manager + expectMsgEquals(Messages.getAcknowledge()); + + // start task manager and wait for registration + ActorGateway taskManager = + TestingUtils.createTaskManager(system, jobManager.actor(), config, true, true); + + // check if we registered the task manager resource + resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), me); + + TestingResourceManager.GetRegisteredResourcesReply reply = + expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class); + + assertEquals(1, reply.resources.size()); + + }}; + }}; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java new file mode 100644 index 0000000..ca8a07a --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework; + +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful; +import org.apache.flink.runtime.clusterframework.messages.RemoveResource; +import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved; +import org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testutils.TestingResourceManager; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.*; + +/** + * General tests for the resource manager component. + */ +public class ResourceManagerTest { + + private static ActorSystem system; + + private static ActorGateway fakeJobManager; + private static ActorGateway resourceManager; + + private static Configuration config = new Configuration(); + + @BeforeClass + public static void setup() { + system = AkkaUtils.createLocalActorSystem(config); + } + + @AfterClass + public static void teardown() { + JavaTestKit.shutdownActorSystem(system); + } + + /** + * Tests the registration and reconciliation of the ResourceManager with the JobManager + */ + @Test + public void testJobManagerRegistrationAndReconciliation() { + new JavaTestKit(system){{ + new Within(duration("10 seconds")) { + @Override + protected void run() { + fakeJobManager = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty()); + resourceManager = TestingUtils.createResourceManager(system, fakeJobManager.actor(), config); + + expectMsgClass(RegisterResourceManager.class); + + List<ResourceID> resourceList = new ArrayList<>(); + resourceList.add(ResourceID.generate()); + resourceList.add(ResourceID.generate()); + resourceList.add(ResourceID.generate()); + + resourceManager.tell( + new RegisterResourceManagerSuccessful(fakeJobManager.actor(), resourceList), + fakeJobManager); + + resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager); + TestingResourceManager.GetRegisteredResourcesReply reply = + expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class); + + for (ResourceID id : resourceList) { + if (!reply.resources.contains(id)) { + fail("Expected to find all resources that were provided during registration."); + } + } + }}; + }}; + } + + /** + * Tests delayed or erroneous registration of the ResourceManager with the JobManager + */ + @Test + public void testDelayedJobManagerRegistration() { + new JavaTestKit(system){{ + new Within(duration("10 seconds")) { + @Override + protected void run() { + + // set a short timeout for lookups + Configuration shortTimeoutConfig = config.clone(); + shortTimeoutConfig.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, "1 s"); + + fakeJobManager = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty()); + resourceManager = TestingUtils.createResourceManager(system, fakeJobManager.actor(), shortTimeoutConfig); + + // wait for registration message + RegisterResourceManager msg = expectMsgClass(RegisterResourceManager.class); + // give wrong response + getLastSender().tell(new JobManagerMessages.LeaderSessionMessage(null, new Object()), + fakeJobManager.actor()); + + // expect another retry and let it time out + expectMsgClass(RegisterResourceManager.class); + + // wait for next try after timeout + expectMsgClass(RegisterResourceManager.class); + + }}; + }}; + } + + @Test + public void testTriggerReconnect() { + new JavaTestKit(system){{ + new Within(duration("10 seconds")) { + @Override + protected void run() { + + // set a long timeout for lookups such that the test fails in case of timeouts + Configuration shortTimeoutConfig = config.clone(); + shortTimeoutConfig.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, "99999 s"); + + fakeJobManager = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty()); + resourceManager = TestingUtils.createResourceManager(system, fakeJobManager.actor(), shortTimeoutConfig); + + // wait for registration message + RegisterResourceManager msg = expectMsgClass(RegisterResourceManager.class); + // all went well + resourceManager.tell( + new RegisterResourceManagerSuccessful(fakeJobManager.actor(), Collections.<ResourceID>emptyList()), + fakeJobManager); + + // force a reconnect + resourceManager.tell( + new TriggerRegistrationAtJobManager(fakeJobManager.actor()), + fakeJobManager); + + // new registration attempt should come in + expectMsgClass(RegisterResourceManager.class); + + }}; + }}; + } + + /** + * Tests the registration and accounting of resources at the ResourceManager. + */ + @Test + public void testTaskManagerRegistration() { + new JavaTestKit(system){{ + new Within(duration("10 seconds")) { + @Override + protected void run() { + + fakeJobManager = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty()); + resourceManager = TestingUtils.createResourceManager(system, fakeJobManager.actor(), config); + + // register with JM + expectMsgClass(RegisterResourceManager.class); + resourceManager.tell( + new RegisterResourceManagerSuccessful(fakeJobManager.actor(), Collections.<ResourceID>emptyList()), + fakeJobManager); + + ResourceID resourceID = ResourceID.generate(); + + // Send task manager registration + resourceManager.tell(new NotifyResourceStarted(resourceID), + fakeJobManager); + + expectMsgClass(Acknowledge.class); + + // check for number registration of registered resources + resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager); + TestingResourceManager.GetRegisteredResourcesReply reply = + expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class); + + assertEquals(1, reply.resources.size()); + + // Send task manager registration again + resourceManager.tell(new NotifyResourceStarted(resourceID), + fakeJobManager); + + expectMsgClass(Acknowledge.class); + + // check for number registration of registered resources + resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager); + reply = expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class); + + assertEquals(1, reply.resources.size()); + + // Send invalid null resource id to throw an exception during resource registration + resourceManager.tell(new NotifyResourceStarted(null), + fakeJobManager); + + expectMsgClass(Acknowledge.class); + + // check for number registration of registered resources + resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager); + reply = expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class); + + assertEquals(1, reply.resources.size()); + }}; + }}; + } + + @Test + public void testResourceRemoval() { + new JavaTestKit(system){{ + new Within(duration("10 seconds")) { + @Override + protected void run() { + + fakeJobManager = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty()); + resourceManager = TestingUtils.createResourceManager(system, fakeJobManager.actor(), config); + + // register with JM + expectMsgClass(RegisterResourceManager.class); + resourceManager.tell( + new RegisterResourceManagerSuccessful(fakeJobManager.actor(), Collections.<ResourceID>emptyList()), + fakeJobManager); + + ResourceID resourceID = ResourceID.generate(); + + // remove unknown resource + resourceManager.tell(new RemoveResource(resourceID), fakeJobManager); + + // Send task manager registration + resourceManager.tell(new NotifyResourceStarted(resourceID), + fakeJobManager); + + expectMsgClass(Acknowledge.class); + + // check for number registration of registered resources + resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager); + TestingResourceManager.GetRegisteredResourcesReply reply = + expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class); + + assertEquals(1, reply.resources.size()); + assertTrue(reply.resources.contains(resourceID)); + + // remove resource + resourceManager.tell(new RemoveResource(resourceID), fakeJobManager); + + // check for number registration of registered resources + resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager); + reply = expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class); + + assertEquals(0, reply.resources.size()); + + }}; + }}; + } + + /** + * Tests notification of JobManager about a failed resource. + */ + @Test + public void testResourceFailureNotification() { + new JavaTestKit(system){{ + new Within(duration("10 seconds")) { + @Override + protected void run() { + + fakeJobManager = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty()); + resourceManager = TestingUtils.createResourceManager(system, fakeJobManager.actor(), config); + + // register with JM + expectMsgClass(RegisterResourceManager.class); + resourceManager.tell( + new RegisterResourceManagerSuccessful(fakeJobManager.actor(), Collections.<ResourceID>emptyList()), + fakeJobManager); + + ResourceID resourceID1 = ResourceID.generate(); + ResourceID resourceID2 = ResourceID.generate(); + + // Send task manager registration + resourceManager.tell(new NotifyResourceStarted(resourceID1), + fakeJobManager); + + expectMsgClass(Acknowledge.class); + + // Send task manager registration + resourceManager.tell(new NotifyResourceStarted(resourceID2), + fakeJobManager); + + expectMsgClass(Acknowledge.class); + + // check for number registration of registered resources + resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager); + TestingResourceManager.GetRegisteredResourcesReply reply = + expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class); + + assertEquals(2, reply.resources.size()); + assertTrue(reply.resources.contains(resourceID1)); + assertTrue(reply.resources.contains(resourceID2)); + + // fail resources + resourceManager.tell(new TestingResourceManager.FailResource(resourceID1), fakeJobManager); + resourceManager.tell(new TestingResourceManager.FailResource(resourceID2), fakeJobManager); + + ResourceRemoved answer = expectMsgClass(ResourceRemoved.class); + ResourceRemoved answer2 = expectMsgClass(ResourceRemoved.class); + + assertEquals(resourceID1, answer.resourceId()); + assertEquals(resourceID2, answer2.resourceId()); + + }}; + }}; + } +}