[FLINK-4489] [tm] Add TaskSlotTable to manage slot allocations for multiple job managers
Add TimerService for slot timeouts Add task and task slot access methods Add comments to newly introduced classes This closes #2638. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5f3adc9f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5f3adc9f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5f3adc9f Branch: refs/heads/flip-6 Commit: 5f3adc9f9d21ff80726f885c751c071a90318aa4 Parents: fe999e0 Author: Till Rohrmann <[email protected]> Authored: Wed Oct 5 11:58:26 2016 +0200 Committer: Till Rohrmann <[email protected]> Committed: Thu Oct 20 19:49:24 2016 +0200 ---------------------------------------------------------------------- .../runtime/clusterframework/types/SlotID.java | 4 +- .../resourcemanager/ResourceManager.java | 13 +- .../resourcemanager/ResourceManagerGateway.java | 13 +- .../runtime/taskexecutor/TaskExecutor.java | 194 +++--- .../runtime/taskexecutor/TaskManagerRunner.java | 1 + .../taskexecutor/TaskManagerServices.java | 29 +- .../TaskManagerServicesConfiguration.java | 2 + .../flink/runtime/taskexecutor/TaskSlot.java | 73 -- .../runtime/taskexecutor/TaskSlotMapping.java | 44 -- .../runtime/taskexecutor/slot/SlotActions.java | 45 ++ .../slot/SlotNotActiveException.java | 34 + .../slot/SlotNotFoundException.java | 37 + .../runtime/taskexecutor/slot/TaskSlot.java | 289 ++++++++ .../taskexecutor/slot/TaskSlotState.java | 29 + .../taskexecutor/slot/TaskSlotTable.java | 682 +++++++++++++++++++ .../taskexecutor/slot/TimeoutListener.java | 37 + .../runtime/taskexecutor/slot/TimerService.java | 160 +++++ .../apache/flink/runtime/taskmanager/Task.java | 8 +- .../PartialConsumePipelinedResultTest.java | 1 - .../runtime/taskexecutor/TaskExecutorTest.java | 4 + 20 files changed, 1457 insertions(+), 242 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5f3adc9f/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java index 237597b..d6409b6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.clusterframework.types; import java.io.Serializable; +import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -34,8 +35,9 @@ public class SlotID implements ResourceIDRetrievable, Serializable { /** The numeric id for single slot */ private final int slotNumber; - + public SlotID(ResourceID resourceId, int slotNumber) { + checkArgument(0 <= slotNumber, "Slot number must be positive."); this.resourceId = checkNotNull(resourceId, "ResourceID must not be null"); this.slotNumber = slotNumber; } http://git-wip-us.apache.org/repos/asf/flink/blob/5f3adc9f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 8fbb34b..3122804 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -40,7 +40,6 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRejected; import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestReply; -import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.SlotAvailableReply; import org.apache.flink.runtime.resourcemanager.registration.JobMasterRegistration; import org.apache.flink.runtime.resourcemanager.registration.WorkerRegistration; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; @@ -337,34 +336,32 @@ public abstract class ResourceManager<WorkerType extends Serializable> /** * Notification from a TaskExecutor that a slot has become available * @param resourceManagerLeaderId TaskExecutor's resource manager leader id - * @param resourceID TaskExecutor's resource id * @param instanceID TaskExecutor's instance id * @param slotID The slot id of the available slot * @return SlotAvailableReply */ @RpcMethod - public SlotAvailableReply notifySlotAvailable( + public void notifySlotAvailable( final UUID resourceManagerLeaderId, - final ResourceID resourceID, final InstanceID instanceID, final SlotID slotID) { if (resourceManagerLeaderId.equals(leaderSessionID)) { - WorkerRegistration<WorkerType> registration = taskExecutors.get(resourceID); + final ResourceID resourceId = slotID.getResourceID(); + WorkerRegistration<WorkerType> registration = taskExecutors.get(resourceId); + if (registration != null) { InstanceID registrationInstanceID = registration.getInstanceID(); if (registrationInstanceID.equals(instanceID)) { runAsync(new Runnable() { @Override public void run() { - slotManager.notifySlotAvailable(resourceID, slotID); + slotManager.notifySlotAvailable(resourceId, slotID); } }); - return new SlotAvailableReply(leaderSessionID, slotID); } } } - return null; } http://git-wip-us.apache.org/repos/asf/flink/blob/5f3adc9f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java index 07e9e43..968eeb8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java @@ -26,7 +26,6 @@ import org.apache.flink.runtime.clusterframework.types.SlotID; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestReply; -import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.SlotAvailableReply; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.jobmaster.JobMaster; @@ -94,17 +93,13 @@ public interface ResourceManagerGateway extends RpcGateway { * Sent by the TaskExecutor to notify the ResourceManager that a slot has become available. * * @param resourceManagerLeaderId The ResourceManager leader id - * @param resourceID The ResourceID of the TaskExecutor - * @param instanceID The InstanceID of the TaskExecutor + * @param instanceId TaskExecutor's instance id * @param slotID The SlotID of the freed slot - * @return The confirmation by the ResourceManager */ - Future<SlotAvailableReply> notifySlotAvailable( + void notifySlotAvailable( UUID resourceManagerLeaderId, - ResourceID resourceID, - InstanceID instanceID, - SlotID slotID, - @RpcTimeout Time timeout); + InstanceID instanceId, + SlotID slotID); /** * Registers an infoMessage listener http://git-wip-us.apache.org/repos/asf/flink/blob/5f3adc9f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 9f9234f..e642315 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -47,6 +47,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered; @@ -64,6 +65,10 @@ import org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder; import org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider; import org.apache.flink.runtime.taskexecutor.rpc.RpcPartitionStateChecker; import org.apache.flink.runtime.taskexecutor.rpc.RpcResultPartitionConsumableNotifier; +import org.apache.flink.runtime.taskexecutor.slot.SlotActions; +import org.apache.flink.runtime.taskexecutor.slot.SlotNotActiveException; +import org.apache.flink.runtime.taskexecutor.slot.SlotNotFoundException; +import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; import org.apache.flink.runtime.taskmanager.CheckpointResponder; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.TaskExecutionState; @@ -76,7 +81,6 @@ import java.net.InetSocketAddress; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -132,13 +136,9 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { private Map<ResourceID, JobManagerConnection> jobManagerConnections; - // --------- Slot allocation table -------- + // --------- task slot allocation table ----------- - private Map<AllocationID, TaskSlot> taskSlots; - - // --------- Slot allocation table -------- - - private Map<ExecutionAttemptID, TaskSlotMapping> taskSlotMappings; + private final TaskSlotTable taskSlotTable; // ------------------------------------------------------------------------ @@ -154,6 +154,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { TaskManagerMetricGroup taskManagerMetricGroup, BroadcastVariableManager broadcastVariableManager, FileCache fileCache, + TaskSlotTable taskSlotTable, FatalErrorHandler fatalErrorHandler) { super(rpcService); @@ -167,6 +168,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { this.networkEnvironment = checkNotNull(networkEnvironment); this.haServices = checkNotNull(haServices); this.metricRegistry = checkNotNull(metricRegistry); + this.taskSlotTable = checkNotNull(taskSlotTable); this.fatalErrorHandler = checkNotNull(fatalErrorHandler); this.taskManagerMetricGroup = checkNotNull(taskManagerMetricGroup); this.broadcastVariableManager = checkNotNull(broadcastVariableManager); @@ -175,8 +177,6 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { this.jobManagerConnections = new HashMap<>(4); this.unconfirmedFreeSlots = new HashSet<>(); - this.taskSlots = new HashMap<>(taskManagerConfiguration.getNumberSlots()); - this.taskSlotMappings = new HashMap<>(taskManagerConfiguration.getNumberSlots() * 2); } // ------------------------------------------------------------------------ @@ -193,6 +193,9 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { } catch (Exception e) { onFatalErrorAsync(e); } + + // tell the task slot table who's responsible for the task slot actions + taskSlotTable.start(new SlotActionsImpl(), taskManagerConfiguration.getTimeout()); } /** @@ -202,6 +205,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { public void shutDown() { log.info("Stopping TaskManager {}.", getAddress()); + taskSlotTable.stop(); + if (resourceManagerConnection.isConnected()) { try { resourceManagerConnection.close(); @@ -264,10 +269,9 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { throw new TaskSubmissionException(message); } - TaskSlot taskSlot = taskSlots.get(tdd.getAllocationID()); - - if (taskSlot == null) { - final String message = "No task slot allocated for allocation ID " + tdd.getAllocationID() + '.'; + if (!taskSlotTable.existActiveSlot(tdd.getJobID(), tdd.getAllocationID())) { + final String message = "No task slot allocated for job ID " + tdd.getJobID() + + " and allocation ID " + tdd.getAllocationID() + '.'; log.debug(message); throw new TaskSubmissionException(message); } @@ -307,10 +311,15 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { log.info("Received task {}.", task.getTaskInfo().getTaskNameWithSubtasks()); - if(taskSlot.add(task)) { - TaskSlotMapping taskSlotMapping = new TaskSlotMapping(task, taskSlot); + boolean taskAdded; - taskSlotMappings.put(task.getExecutionId(), taskSlotMapping); + try { + taskAdded = taskSlotTable.addTask(task); + } catch (SlotNotFoundException | SlotNotActiveException e) { + throw new TaskSubmissionException("Could not submit task.", e); + } + + if (taskAdded) { task.startTaskThread(); return Acknowledge.get(); @@ -325,7 +334,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { @RpcMethod public Acknowledge cancelTask(ExecutionAttemptID executionAttemptID) throws TaskException { - final Task task = getTask(executionAttemptID); + final Task task = taskSlotTable.getTask(executionAttemptID); if (task != null) { try { @@ -344,7 +353,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { @RpcMethod public Acknowledge stopTask(ExecutionAttemptID executionAttemptID) throws TaskException { - final Task task = getTask(executionAttemptID); + final Task task = taskSlotTable.getTask(executionAttemptID); if (task != null) { try { @@ -367,7 +376,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { @RpcMethod public Acknowledge updatePartitions(final ExecutionAttemptID executionAttemptID, Collection<PartitionInfo> partitionInfos) throws PartitionException { - final Task task = getTask(executionAttemptID); + final Task task = taskSlotTable.getTask(executionAttemptID); if (task != null) { for (final PartitionInfo partitionInfo: partitionInfos) { @@ -430,7 +439,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { public Acknowledge triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp) throws CheckpointException { log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID); - final Task task = getTask(executionAttemptID); + final Task task = taskSlotTable.getTask(executionAttemptID); if (task != null) { task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp); @@ -448,7 +457,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { public Acknowledge confirmCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp) throws CheckpointException { log.debug("Confirm checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID); - final Task task = getTask(executionAttemptID); + final Task task = taskSlotTable.getTask(executionAttemptID); if (task != null) { task.notifyCheckpointComplete(checkpointId); @@ -494,68 +503,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { return jobManagerConnections.get(jobManagerID); } - private Task getTask(ExecutionAttemptID executionAttemptID) { - TaskSlotMapping taskSlotMapping = taskSlotMappings.get(executionAttemptID); - - if (taskSlotMapping != null) { - return taskSlotMapping.getTask(); - } else { - return null; - } - } - - private Task removeTask(ExecutionAttemptID executionAttemptID) { - TaskSlotMapping taskSlotMapping = taskSlotMappings.remove(executionAttemptID); - - if (taskSlotMapping != null) { - final Task task = taskSlotMapping.getTask(); - final TaskSlot taskSlot = taskSlotMapping.getTaskSlot(); - - taskSlot.remove(task); - - return task; - } else { - return null; - } - } - - private Iterable<Task> getAllTasks() { - final Iterator<TaskSlotMapping> taskEntryIterator = taskSlotMappings.values().iterator(); - final Iterator<Task> iterator = new Iterator<Task>() { - @Override - public boolean hasNext() { - return taskEntryIterator.hasNext(); - } - - @Override - public Task next() { - return taskEntryIterator.next().getTask(); - } - - @Override - public void remove() { - taskEntryIterator.remove(); - } - }; - - return new Iterable<Task>() { - @Override - public Iterator<Task> iterator() { - return iterator; - } - }; - } - - private void clearTasks() { - taskSlotMappings.clear(); - - for (TaskSlot taskSlot: taskSlots.values()) { - taskSlot.clear(); - } - } - private void failTask(final ExecutionAttemptID executionAttemptID, final Throwable cause) { - final Task task = getTask(executionAttemptID); + final Task task = taskSlotTable.getTask(executionAttemptID); if (task != null) { try { @@ -568,18 +517,6 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { } } - private void cancelAndClearAllTasks(Throwable cause) { - log.info("Cancellaing all computations and discarding all cached data."); - - Iterable<Task> tasks = getAllTasks(); - - for (Task task: tasks) { - task.failExternally(cause); - } - - clearTasks(); - } - private void updateTaskExecutionState( final UUID jobMasterLeaderId, final JobMasterGateway jobMasterGateway, @@ -602,11 +539,10 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { private void unregisterTaskAndNotifyFinalState( final UUID jobMasterLeaderId, - final JobMasterGateway jobMasterGateway, - final ExecutionAttemptID executionAttemptID) - { - Task task = removeTask(executionAttemptID); + final JobMasterGateway jobMasterGateway, + final ExecutionAttemptID executionAttemptID) { + Task task = taskSlotTable.removeTask(executionAttemptID); if (task != null) { if (!task.getExecutionState().isTerminal()) { try { @@ -718,6 +654,41 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { } } + private void freeSlot(AllocationID allocationId) { + Preconditions.checkNotNull(allocationId); + + try { + int freedSlotIndex = taskSlotTable.freeSlot(allocationId); + + if (freedSlotIndex != -1 && isConnectedToResourceManager()) { + // the slot was freed. Tell the RM about it + ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway(); + + resourceManagerGateway.notifySlotAvailable( + resourceManagerConnection.getTargetLeaderId(), + resourceManagerConnection.getRegistrationId(), + new SlotID(getResourceID(), freedSlotIndex)); + } + } catch (SlotNotFoundException e) { + log.debug("Could not free slot for allocation id {}.", allocationId, e); + } + } + + private void timeoutSlot(AllocationID allocationId, UUID ticket) { + Preconditions.checkNotNull(allocationId); + Preconditions.checkNotNull(ticket); + + if (taskSlotTable.isValidTimeout(allocationId, ticket)) { + freeSlot(allocationId); + } else { + log.debug("Received an invalid timeout for allocation id {} with ticket {}.", allocationId, ticket); + } + } + + private boolean isConnectedToResourceManager() { + return (resourceManagerConnection != null && resourceManagerConnection.isConnected()); + } + // ------------------------------------------------------------------------ // Properties // ------------------------------------------------------------------------ @@ -778,7 +749,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { /** * The listener for leader changes of the resource manager */ - private class ResourceManagerLeaderListener implements LeaderRetrievalListener { + private final class ResourceManagerLeaderListener implements LeaderRetrievalListener { @Override public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) { @@ -796,7 +767,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { } } - private class TaskManagerActionsImpl implements TaskManagerActions { + private final class TaskManagerActionsImpl implements TaskManagerActions { private final UUID jobMasterLeaderId; private final JobMasterGateway jobMasterGateway; @@ -837,4 +808,27 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { } } + private class SlotActionsImpl implements SlotActions { + + @Override + public void freeSlot(final AllocationID allocationId) { + runAsync(new Runnable() { + @Override + public void run() { + TaskExecutor.this.freeSlot(allocationId); + } + }); + } + + @Override + public void timeoutSlot(final AllocationID allocationId, final UUID ticket) { + runAsync(new Runnable() { + @Override + public void run() { + TaskExecutor.this.timeoutSlot(allocationId, ticket); + } + }); + } + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/5f3adc9f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index bb66655..ca1d2ce 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -98,6 +98,7 @@ public class TaskManagerRunner implements FatalErrorHandler { taskManagerServices.getTaskManagerMetricGroup(), taskManagerServices.getBroadcastVariableManager(), taskManagerServices.getFileCache(), + taskManagerServices.getTaskSlotTable(), this); } http://git-wip-us.apache.org/repos/asf/flink/blob/5f3adc9f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java index e264a1c..c1728b4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -20,7 +20,9 @@ package org.apache.flink.runtime.taskexecutor; import org.apache.flink.core.memory.MemoryType; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; +import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.filecache.FileCache; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; @@ -38,6 +40,8 @@ import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats; import org.apache.flink.runtime.query.netty.KvStateServer; +import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; +import org.apache.flink.runtime.taskexecutor.slot.TimerService; import org.apache.flink.runtime.taskexecutor.utils.TaskExecutorMetricsInitializer; import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -48,6 +52,9 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ScheduledThreadPoolExecutor; /** * Container for {@link TaskExecutor} services such as the {@link MemoryManager}, {@link IOManager}, @@ -65,6 +72,7 @@ public class TaskManagerServices { private final TaskManagerMetricGroup taskManagerMetricGroup; private final BroadcastVariableManager broadcastVariableManager; private final FileCache fileCache; + private final TaskSlotTable taskSlotTable; private TaskManagerServices( TaskManagerLocation taskManagerLocation, @@ -74,7 +82,8 @@ public class TaskManagerServices { MetricRegistry metricRegistry, TaskManagerMetricGroup taskManagerMetricGroup, BroadcastVariableManager broadcastVariableManager, - FileCache fileCache) { + FileCache fileCache, + TaskSlotTable taskSlotTable) { this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation); this.memoryManager = Preconditions.checkNotNull(memoryManager); @@ -84,6 +93,7 @@ public class TaskManagerServices { this.taskManagerMetricGroup = Preconditions.checkNotNull(taskManagerMetricGroup); this.broadcastVariableManager = Preconditions.checkNotNull(broadcastVariableManager); this.fileCache = Preconditions.checkNotNull(fileCache); + this.taskSlotTable = Preconditions.checkNotNull(taskSlotTable); } // -------------------------------------------------------------------------------------------- @@ -121,6 +131,10 @@ public class TaskManagerServices { public FileCache getFileCache() { return fileCache; } + + public TaskSlotTable getTaskSlotTable() { + return taskSlotTable; + } // -------------------------------------------------------------------------------------------- // Static factory methods for task manager services @@ -167,6 +181,16 @@ public class TaskManagerServices { final FileCache fileCache = new FileCache(taskManagerServicesConfiguration.getTmpDirPaths()); + final List<ResourceProfile> resourceProfiles = new ArrayList<>(taskManagerServicesConfiguration.getNumberOfSlots()); + + for (int i = 0; i < taskManagerServicesConfiguration.getNumberOfSlots(); i++) { + resourceProfiles.add(new ResourceProfile(1.0, 42L)); + } + + final TimerService<AllocationID> timerService = new TimerService<>(new ScheduledThreadPoolExecutor(1)); + + final TaskSlotTable taskSlotTable = new TaskSlotTable(resourceProfiles, timerService); + return new TaskManagerServices( taskManagerLocation, memoryManager, @@ -175,7 +199,8 @@ public class TaskManagerServices { metricRegistry, taskManagerMetricGroup, broadcastVariableManager, - fileCache); + fileCache, + taskSlotTable); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/5f3adc9f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java index 80dfc09..036a890 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java @@ -173,6 +173,8 @@ public class TaskManagerServicesConfiguration { final MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(configuration); + + return new TaskManagerServicesConfiguration( remoteAddress, tmpDirs, http://git-wip-us.apache.org/repos/asf/flink/blob/5f3adc9f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlot.java deleted file mode 100644 index 4fc1d66..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlot.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskexecutor; - -import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.taskmanager.Task; -import org.apache.flink.util.Preconditions; - -import java.util.HashMap; -import java.util.Map; - -/** - * Container for multiple {@link Task} belonging to the same slot. - */ -public class TaskSlot { - private final AllocationID allocationID; - private final ResourceID resourceID; - private final Map<ExecutionAttemptID, Task> tasks; - - public TaskSlot(AllocationID allocationID, ResourceID resourceID) { - this.allocationID = Preconditions.checkNotNull(allocationID); - this.resourceID = Preconditions.checkNotNull(resourceID); - tasks = new HashMap<>(4); - } - - public AllocationID getAllocationID() { - return allocationID; - } - - public ResourceID getResourceID() { - return resourceID; - } - - public boolean add(Task task) { - // sanity check - Preconditions.checkArgument(allocationID.equals(task.getAllocationID())); - - Task oldTask = tasks.put(task.getExecutionId(), task); - - if (oldTask != null) { - tasks.put(task.getExecutionId(), oldTask); - return false; - } else { - return true; - } - } - - public Task remove(Task task) { - return tasks.remove(task.getExecutionId()); - } - - public void clear() { - tasks.clear(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/5f3adc9f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlotMapping.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlotMapping.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlotMapping.java deleted file mode 100644 index e67fd52..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlotMapping.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskexecutor; - -import org.apache.flink.runtime.taskmanager.Task; -import org.apache.flink.util.Preconditions; - -/** - * Mapping between a {@link Task} and its {@link TaskSlot}. - */ -public class TaskSlotMapping { - - private final Task task; - private final TaskSlot taskSlot; - - public TaskSlotMapping(Task task, TaskSlot taskSlot) { - this.task = Preconditions.checkNotNull(task); - this.taskSlot = Preconditions.checkNotNull(taskSlot); - } - - public Task getTask() { - return task; - } - - public TaskSlot getTaskSlot() { - return taskSlot; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/5f3adc9f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotActions.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotActions.java new file mode 100644 index 0000000..f7ed235 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotActions.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor.slot; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; + +import java.util.UUID; + +/** + * Interface to trigger slot actions from within the {@link TaskSlotTable}. + */ +public interface SlotActions { + + /** + * Free the task slot with the given allocation id. + * + * @param allocationId to identify the slot to be freed + */ + void freeSlot(AllocationID allocationId); + + /** + * Timeout the task slot for the given allocation id. The timeout is identified by the given + * ticket to filter invalid timeouts out. + * + * @param allocationId identifying the task slot to be timed out + * @param ticket allowing to filter invalid timeouts out + */ + void timeoutSlot(AllocationID allocationId, UUID ticket); +} http://git-wip-us.apache.org/repos/asf/flink/blob/5f3adc9f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotNotActiveException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotNotActiveException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotNotActiveException.java new file mode 100644 index 0000000..b0ddc5d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotNotActiveException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor.slot; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.AllocationID; + +/** + * Exception indicating that the given {@link TaskSlot} was not in state active. + */ +public class SlotNotActiveException extends Exception { + + private static final long serialVersionUID = 4305837511564584L; + + public SlotNotActiveException(JobID jobId, AllocationID allocationId) { + super("No active slot for job " + jobId + " with allocation id " + allocationId + '.'); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/5f3adc9f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotNotFoundException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotNotFoundException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotNotFoundException.java new file mode 100644 index 0000000..c639b16 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotNotFoundException.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor.slot; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; + +/** + * Exception indicating that a {@link TaskSlot} could not be found. + */ +public class SlotNotFoundException extends Exception { + + private static final long serialVersionUID = -883614807750137925L; + + public SlotNotFoundException(AllocationID allocationId) { + this("Could not find slot for " + allocationId + '.'); + } + + public SlotNotFoundException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/5f3adc9f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java new file mode 100644 index 0000000..0942772 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor.slot; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.taskmanager.Task; +import org.apache.flink.util.Preconditions; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +/** + * Container for multiple {@link Task} belonging to the same slot. A {@link TaskSlot} can be in one + * of the following states: + * <ul> + * <li>Free - The slot is empty and not allocated to a job</li> + * <li>Releasing - The slot is about to be freed after it has become empty.</li> + * <li>Allocated - The slot has been allocated for a job.</li> + * <li>Active - The slot is in active use by a job manager which is the leader of the allocating job.</li> + * </ul> + * <p> + * A task slot can only be allocated if it is in state free. An allocated task slot can transition + * to state active. + *<p> + * An active slot allows to add tasks from the respective job and with the correct allocation id. + * An active slot can be marked as inactive which sets the state back to allocated. + * <p> + * An allocated or active slot can only be freed if it is empty. If it is not empty, then it's state + * can be set to releasing indicating that it can be freed once it becomes empty. + */ +public class TaskSlot { + + /** Index of the task slot */ + private final int index; + + /** Resource characteristics for this slot */ + private final ResourceProfile resourceProfile; + + /** Tasks running in this slot */ + private final Map<ExecutionAttemptID, Task> tasks; + + /** State of this slot */ + private TaskSlotState state; + + /** Job id to which the slot has been allocated; null if not allocated */ + private JobID jobId; + + /** Allocation id of this slot; null if not allocated */ + private AllocationID allocationId; + + TaskSlot(final int index, final ResourceProfile resourceProfile) { + Preconditions.checkArgument(0 <= index, "The index must be greater than 0."); + this.index = index; + this.resourceProfile = Preconditions.checkNotNull(resourceProfile); + + this.tasks = new HashMap<>(4); + this.state = TaskSlotState.FREE; + + this.jobId = null; + this.allocationId = null; + } + + // ---------------------------------------------------------------------------------- + // State accessors + // ---------------------------------------------------------------------------------- + + public int getIndex() { + return index; + } + + public ResourceProfile getResourceProfile() { + return resourceProfile; + } + + public JobID getJobId() { + return jobId; + } + + public AllocationID getAllocationId() { + return allocationId; + } + + TaskSlotState getState() { + return state; + } + + public boolean isEmpty() { + return tasks.isEmpty(); + } + + public boolean isFree() { + return TaskSlotState.FREE == state; + } + + public boolean isActive(JobID activeJobId, AllocationID activeAllocationId) { + Preconditions.checkNotNull(activeJobId); + Preconditions.checkNotNull(activeAllocationId); + + return TaskSlotState.ACTIVE == state && + activeJobId.equals(jobId) && + activeAllocationId.equals(allocationId); + } + + public boolean isAllocated(JobID jobIdToCheck, AllocationID allocationIDToCheck) { + Preconditions.checkNotNull(jobIdToCheck); + Preconditions.checkNotNull(allocationIDToCheck); + + return jobIdToCheck.equals(jobId) && allocationIDToCheck.equals(allocationId) && + (TaskSlotState.ACTIVE == state || TaskSlotState.ALLOCATED == state); + } + + public boolean isReleasing() { + return TaskSlotState.RELEASING == state; + } + + /** + * Get all tasks running in this task slot. + * + * @return Iterator to all currently contained tasks in this task slot. + */ + public Iterator<Task> getTasks() { + return tasks.values().iterator(); + } + + // ---------------------------------------------------------------------------------- + // State changing methods + // ---------------------------------------------------------------------------------- + + /** + * Add the given task to the task slot. This is only possible if there is not already another + * task with the same execution attempt id added to the task slot. In this case, the method + * returns true. Otherwise the task slot is left unchanged and false is returned. + * + * In case that the task slot state is not active an {@link IllegalStateException} is thrown. + * In case that the task's job id and allocation id don't match with the job id and allocation + * id for which the task slot has been allocated, an {@link IllegalArgumentException} is thrown. + * + * @param task to be added to the task slot + * @throws IllegalStateException if the task slot is not in state active + * @return true if the task was added to the task slot; otherwise false + */ + public boolean add(Task task) { + // Check that this slot has been assigned to the job sending this task + Preconditions.checkArgument(task.getJobID().equals(jobId), "The task's job id does not match the " + + "job id for which the slot has been allocated."); + Preconditions.checkArgument(task.getAllocationId().equals(allocationId), "The task's allocation " + + "id does not match the allocation id for which the slot has been allocated."); + Preconditions.checkState(TaskSlotState.ACTIVE == state, "The task slot is not in state active."); + + Task oldTask = tasks.put(task.getExecutionId(), task); + + if (oldTask != null) { + tasks.put(task.getExecutionId(), oldTask); + return false; + } else { + return true; + } + } + + /** + * Remove the task identified by the given execution attempt id. + * + * @param executionAttemptId identifying the task to be removed + * @return The removed task if there was any; otherwise null. + */ + public Task remove(ExecutionAttemptID executionAttemptId) { + return tasks.remove(executionAttemptId); + } + + /** + * Removes all tasks from this task slot. + */ + public void clear() { + tasks.clear(); + } + + /** + * Allocate the task slot for the given job and allocation id. If the slot could be allocated, + * or is already allocated/active for the given job and allocation id, then the method returns + * true. Otherwise it returns false. + * + * A slot can only be allocated if it's current state is free. + * + * @param newJobId to allocate the slot for + * @param newAllocationId to identify the slot allocation + * @return True if the slot was allocated for the given job and allocation id; otherwise false + */ + public boolean allocate(JobID newJobId, AllocationID newAllocationId) { + if (TaskSlotState.FREE == state) { + // sanity checks + Preconditions.checkState(allocationId == null); + Preconditions.checkState(jobId == null); + + this.jobId = Preconditions.checkNotNull(newJobId); + this.allocationId = Preconditions.checkNotNull(newAllocationId); + + state = TaskSlotState.ALLOCATED; + + return true; + } else if (TaskSlotState.ALLOCATED == state || TaskSlotState.ACTIVE == state) { + Preconditions.checkNotNull(newJobId); + Preconditions.checkNotNull(newAllocationId); + + return newJobId.equals(jobId) && newAllocationId.equals(allocationId); + } else { + return false; + } + } + + /** + * Mark this slot as active. A slot can only be marked active if it's in state allocated. + * + * The method returns true if the slot was set to active. Otherwise it returns false. + * + * @return True if the new state of the slot is active; otherwise false + */ + public boolean markActive() { + if (TaskSlotState.ALLOCATED == state || TaskSlotState.ACTIVE == state) { + state = TaskSlotState.ACTIVE; + + return true; + } else { + return false; + } + } + + /** + * Mark the slot as inactive/allocated. A slot can only be marked as inactive/allocated if it's + * in state allocated or active. + * + * @return True if the new state of the slot is allocated; otherwise false + */ + public boolean markInactive() { + if (TaskSlotState.ACTIVE == state || TaskSlotState.ALLOCATED == state) { + state = TaskSlotState.ALLOCATED; + + return true; + } else { + return false; + } + } + + /** + * Mark the slot as free. A slot can only marked as free if it's empty. + * + * @return True if the new state is free; otherwise false + */ + public boolean markFree() { + if (isEmpty()) { + state = TaskSlotState.FREE; + this.jobId = null; + this.allocationId = null; + + return true; + } else { + return false; + } + } + + /** + * Mark this slot as releasing. A slot can always be marked as releasing. + * + * @return True + */ + public boolean markReleasing() { + state = TaskSlotState.RELEASING; + return true; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/5f3adc9f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotState.java new file mode 100644 index 0000000..e3ba903 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotState.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor.slot; + +/** + * Internal task slot state + */ +enum TaskSlotState { + ACTIVE, // Slot is in active use by a job manager responsible for a job + ALLOCATED, // Slot has been allocated for a job but not yet given to a job manager + RELEASING, // Slot is not empty but tasks are failed. Upon removal of all tasks, it will be released + FREE // Slot is free +} http://git-wip-us.apache.org/repos/asf/flink/blob/5f3adc9f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java new file mode 100644 index 0000000..42cb919 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java @@ -0,0 +1,682 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor.slot; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.taskmanager.Task; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.UUID; + +/** + * Container for multiple {@link TaskSlot} instances. Additionally, it maintains multiple indices + * for faster access to tasks and sets of allocated slots. + * <p> + * The task slot table automatically registers timeouts for allocated slots which cannot be assigned + * to a job manager. + * <p> + * Before the task slot table can be used, it must be started via the {@link #start} method. + */ +public class TaskSlotTable implements TimeoutListener<AllocationID> { + + private static final Logger LOG = LoggerFactory.getLogger(TaskSlotTable.class); + + /** Timer service used to time out allocated slots */ + private final TimerService<AllocationID> timerService; + + /** The list of all task slots */ + private final List<TaskSlot> taskSlots; + + /** Mapping from allocation id to task slot */ + private final Map<AllocationID, TaskSlot> allocationIDTaskSlotMap; + + /** Mapping from execution attempt id to task and task slot */ + private final Map<ExecutionAttemptID, TaskSlotMapping> taskSlotMappings; + + /** Mapping from job id to allocated slots for a job */ + private final Map<JobID, Set<AllocationID>> slotsPerJob; + + /** Interface for slot actions, such as freeing them or timing them out */ + private SlotActions slotActions; + + /** The timeout for allocated slots */ + private Time slotTimeout; + + /** Whether the table has been started */ + private boolean started; + + public TaskSlotTable( + final Collection<ResourceProfile> resourceProfiles, + final TimerService<AllocationID> timerService) { + + int numberSlots = resourceProfiles.size(); + + Preconditions.checkArgument(0 < numberSlots, "The number of task slots must be greater than 0."); + + this.timerService = Preconditions.checkNotNull(timerService); + + taskSlots = Arrays.asList(new TaskSlot[numberSlots]); + + int index = 0; + + // create the task slots for the given resource profiles + for (ResourceProfile resourceProfile: resourceProfiles) { + taskSlots.set(index, new TaskSlot(index, resourceProfile)); + ++index; + } + + allocationIDTaskSlotMap = new HashMap<>(numberSlots); + + taskSlotMappings = new HashMap<>(4 * numberSlots); + + slotsPerJob = new HashMap<>(4); + + slotActions = null; + slotTimeout = null; + started = false; + } + + /** + * Start the task slot table with the given slot actions and slot timeout value. + * + * @param initialSlotActions to use for slot actions + * @param initialSlotTimeout to use for slot timeouts + */ + public void start(SlotActions initialSlotActions, Time initialSlotTimeout) { + this.slotActions = Preconditions.checkNotNull(initialSlotActions); + this.slotTimeout = Preconditions.checkNotNull(initialSlotTimeout); + + timerService.start(this); + + started = true; + } + + /** + * Stop the task slot table. + */ + public void stop() { + started = false; + timerService.stop(); + slotTimeout = null; + slotActions = null; + } + + // --------------------------------------------------------------------- + // Slot methods + // --------------------------------------------------------------------- + + /** + * Allocate the slot with the given index for the given job and allocation id. Returns true if + * the slot could be allocated. Otherwise it returns false; + * + * @param index of the task slot to allocate + * @param jobId to allocate the task slot for + * @param allocationId identifying the allocation + * @return True if the task slot could be allocated; otherwise false + */ + public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId) { + checkInit(); + + TaskSlot taskSlot = taskSlots.get(index); + + boolean result = taskSlot.allocate(jobId, allocationId); + + if (result) { + // update the alloction id to task slot map + allocationIDTaskSlotMap.put(allocationId, taskSlot); + + // register a timeout for this slot since it's in state allocated + timerService.registerTimeout(allocationId, slotTimeout.getSize(), slotTimeout.getUnit()); + + // add this slot to the set of job slots + Set<AllocationID> slots = slotsPerJob.get(jobId); + + if (slots == null) { + slots = new HashSet<>(4); + slotsPerJob.put(jobId, slots); + } + + slots.add(allocationId); + } + + return result; + } + + /** + * Marks the slot under the given allocation id as active. If the slot could not be found, then + * a {@link SlotNotFoundException} is thrown. + * + * @param allocationId to identify the task slot to mark as active + * @throws SlotNotFoundException if the slot could not be found for the given allocation id + * @return True if the slot could be marked active + */ + public boolean markSlotActive(AllocationID allocationId) throws SlotNotFoundException { + checkInit(); + + TaskSlot taskSlot = getTaskSlot(allocationId); + + if (taskSlot != null) { + if (taskSlot.markActive()) { + // unregister a potential timeout + timerService.unregisterTimeout(allocationId); + + return true; + } else { + return false; + } + } else { + throw new SlotNotFoundException(allocationId); + } + } + + /** + * Marks the slot under the given allocation id as inactive. If the slot could not be found, + * then a {@link SlotNotFoundException} is thrown. + * + * @param allocationId to identify the task slot to mark as inactive + * @throws SlotNotFoundException if the slot could not be found for the given allocation id + * @return True if the slot could be marked inactive + */ + public boolean markSlotInactive(AllocationID allocationId) throws SlotNotFoundException { + checkInit(); + + TaskSlot taskSlot = getTaskSlot(allocationId); + + if (taskSlot != null) { + if (taskSlot.markInactive()) { + // register a timeout to free the slot + timerService.registerTimeout(allocationId, slotTimeout.getSize(), slotTimeout.getUnit()); + + return true; + } else { + return false; + } + } else { + throw new SlotNotFoundException(allocationId); + } + } + + /** + * Try to free the slot. If the slot is empty it will set the state of the task slot to free + * and return its index. If the slot is not empty, then it will set the state of the task slot + * to releasing, fail all tasks and return -1. + * + * @param allocationId identifying the task slot to be freed + * @throws SlotNotFoundException if there is not task slot for the given allocation id + * @return Index of the freed slot if the slot could be freed; otherwise -1 + */ + public int freeSlot(AllocationID allocationId) throws SlotNotFoundException { + return freeSlot(allocationId, new Exception("The task slot of this task is being freed.")); + } + + /** + * Tries to free the slot. If the slot is empty it will set the state of the task slot to free + * and return its index. If the slot is not empty, then it will set the state of the task slot + * to releasing, fail all tasks and return -1. + * + * @param allocationId identifying the task slot to be freed + * @param cause to fail the tasks with if slot is not empty + * @throws SlotNotFoundException if there is not task slot for the given allocation id + * @return Index of the freed slot if the slot could be freed; otherwise -1 + */ + public int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException { + checkInit(); + + TaskSlot taskSlot = getTaskSlot(allocationId); + + if (taskSlot != null) { + LOG.info("Free slot {}.", allocationId, cause); + + final JobID jobId = taskSlot.getJobId(); + + if (taskSlot.markFree()) { + // remove the allocation id to task slot mapping + allocationIDTaskSlotMap.remove(allocationId); + + // unregister a potential timeout + timerService.unregisterTimeout(allocationId); + + Set<AllocationID> slots = slotsPerJob.get(jobId); + + if (slots == null) { + throw new IllegalStateException("There are no more slots allocated for the job " + jobId + + ". This indicates a programming bug."); + } + + slots.remove(allocationId); + + if (slots.isEmpty()) { + slotsPerJob.remove(jobId); + } + + return taskSlot.getIndex(); + } else { + // we couldn't free the task slot because it still contains task, fail the tasks + // and set the slot state to releasing so that it gets eventually freed + taskSlot.markReleasing(); + + Iterator<Task> taskIterator = taskSlot.getTasks(); + + while (taskIterator.hasNext()) { + taskIterator.next().failExternally(cause); + } + + return -1; + } + } else { + throw new SlotNotFoundException(allocationId); + } + } + + /** + * Check whether the timeout with ticket is valid for the given allocation id. + * + * @param allocationId to check against + * @param ticket of the timeout + * @return True if the timeout is valid; otherwise false + */ + public boolean isValidTimeout(AllocationID allocationId, UUID ticket) { + checkInit(); + + return timerService.isValid(allocationId, ticket); + } + + /** + * Check whether the slot for the given index is allocated for the given job and allocation id. + * + * @param index of the task slot + * @param jobId for which the task slot should be allocated + * @param allocationId which should match the task slot's allocation id + * @return True if the given task slot is allocated for the given job and allocation id + */ + public boolean isAllocated(int index, JobID jobId, AllocationID allocationId) { + checkInit(); + + TaskSlot taskSlot = taskSlots.get(index); + + return taskSlot.isAllocated(jobId, allocationId); + } + + /** + * Check whether there exists an active slot for the given job and allocation id. + * + * @param jobId of the allocated slot + * @param allocationId identifying the allocation + * @return True if there exists a task slot which is active for the given job and allocation id. + */ + public boolean existActiveSlot(JobID jobId, AllocationID allocationId) { + TaskSlot taskSlot = getTaskSlot(allocationId); + + if (taskSlot != null) { + return taskSlot.isActive(jobId, allocationId); + } else { + return false; + } + } + + /** + * Check whether the task slot with the given index is free. + * + * @param index of the task slot + * @return True if the task slot is free; otherwise false + */ + public boolean isSlotFree(int index) { + TaskSlot taskSlot = taskSlots.get(index); + + return taskSlot.isFree(); + } + + /** + * Check whether the job has allocated (not active) slots. + * + * @param jobId for which to check for allocated slots + * @return True if there are allocated slots for the given job id. + */ + public boolean hasAllocatedSlots(JobID jobId) { + return getAllocatedSlots(jobId).hasNext(); + } + + /** + * Return an iterator of allocated slots (their allocation ids) for the given job id. + * + * @param jobId for which to return the allocated slots + * @return Iterator of allocation ids of allocated slots. + */ + public Iterator<AllocationID> getAllocatedSlots(JobID jobId) { + return new AllocationIDIterator(jobId, TaskSlotState.ALLOCATED); + } + + /** + * Return an iterator of active slots (their application ids) for the given job id. + * + * @param jobId for which to return the active slots + * @return Iterator of allocation ids of active slots + */ + public Iterator<AllocationID> getActiveSlots(JobID jobId) { + return new AllocationIDIterator(jobId, TaskSlotState.ACTIVE); + } + + // --------------------------------------------------------------------- + // Task methods + // --------------------------------------------------------------------- + + /** + * Add the given task to the slot identified by the task's allocation id. + * + * @param task to add to the task slot with the respective allocation id + * @throws SlotNotFoundException if there was no slot for the given allocation id + * @throws SlotNotActiveException if there was no slot active for task's job and allocation id + * @return True if the task could be added to the task slot; otherwise false + */ + public boolean addTask(Task task) throws SlotNotFoundException, SlotNotActiveException { + Preconditions.checkNotNull(task); + + TaskSlot taskSlot = getTaskSlot(task.getAllocationId()); + + if (taskSlot != null) { + if (taskSlot.isActive(task.getJobID(), task.getAllocationId())) { + if (taskSlot.add(task)) { + taskSlotMappings.put(task.getExecutionId(), new TaskSlotMapping(task, taskSlot)); + + return true; + } else { + return false; + } + } else { + throw new SlotNotActiveException(task.getJobID(), task.getAllocationId()); + } + } else { + throw new SlotNotFoundException(taskSlot.getAllocationId()); + } + } + + /** + * Remove the task with the given execution attempt id from its task slot. If the owning task + * slot is in state releasing and empty after removing the task, the slot is freed via the + * slot actions. + * + * @param executionAttemptID identifying the task to remove + * @return The removed task if there is any for the given execution attempt id; otherwise null + */ + public Task removeTask(ExecutionAttemptID executionAttemptID) { + TaskSlotMapping taskSlotMapping = taskSlotMappings.remove(executionAttemptID); + + if (taskSlotMapping != null) { + Task task = taskSlotMapping.getTask(); + TaskSlot taskSlot = taskSlotMapping.getTaskSlot(); + + taskSlot.remove(task.getExecutionId()); + + if (taskSlot.isReleasing() && taskSlot.isEmpty()) { + slotActions.freeSlot(taskSlot.getAllocationId()); + } + + return task; + } else { + return null; + } + } + + /** + * Get the task for the given execution attempt id. If none could be found, then return null. + * + * @param executionAttemptID identifying the requested task + * @return The task for the given execution attempt id if it exist; otherwise null + */ + public Task getTask(ExecutionAttemptID executionAttemptID) { + TaskSlotMapping taskSlotMapping = taskSlotMappings.get(executionAttemptID); + + if (taskSlotMapping != null) { + return taskSlotMapping.getTask(); + } else { + return null; + } + } + + /** + * Return an iterator over all tasks for a given job. + * + * @param jobId identifying the job of the requested tasks + * @return Iterator over all task for a given job + */ + public Iterator<Task> getTasks(JobID jobId) { + return new TaskIterator(jobId); + } + + // --------------------------------------------------------------------- + // TimeoutListener methods + // --------------------------------------------------------------------- + + @Override + public void notifyTimeout(AllocationID key, UUID ticket) { + if (slotActions != null) { + slotActions.timeoutSlot(key, ticket); + } + } + + // --------------------------------------------------------------------- + // Internal methods + // --------------------------------------------------------------------- + + private TaskSlot getTaskSlot(AllocationID allocationId) { + Preconditions.checkNotNull(allocationId); + + TaskSlot taskSlot = allocationIDTaskSlotMap.get(allocationId); + + return taskSlot; + } + + private void checkInit() { + Preconditions.checkState(started, "The " + TaskSlotTable.class.getSimpleName() + " has to be started."); + } + + // --------------------------------------------------------------------- + // Static utility classes + // --------------------------------------------------------------------- + + /** + * Mapping class between a {@link Task} and a {@link TaskSlot}. + */ + private static final class TaskSlotMapping { + private final Task task; + private final TaskSlot taskSlot; + + + private TaskSlotMapping(Task task, TaskSlot taskSlot) { + this.task = Preconditions.checkNotNull(task); + this.taskSlot = Preconditions.checkNotNull(taskSlot); + } + + public Task getTask() { + return task; + } + + public TaskSlot getTaskSlot() { + return taskSlot; + } + } + + /** + * Iterator over {@link AllocationID} of the {@link TaskSlot} of a given job. Additionally, + * the task slots identified by the allocation ids are in the given state. + */ + private final class AllocationIDIterator implements Iterator<AllocationID> { + private final Iterator<TaskSlot> iterator; + + private AllocationIDIterator(JobID jobId, TaskSlotState state) { + iterator = new TaskSlotIterator(jobId, state); + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public AllocationID next() { + try { + return iterator.next().getAllocationId(); + } catch (NoSuchElementException e) { + throw new NoSuchElementException("No more allocation ids."); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Cannot remove allocation ids via this iterator."); + } + } + + /** + * Iterator over {@link TaskSlot} which fulfill a given state condition and belong to the given + * job. + */ + private final class TaskSlotIterator implements Iterator<TaskSlot> { + private final Iterator<AllocationID> allSlots; + private final TaskSlotState state; + + private TaskSlot currentSlot; + + private TaskSlotIterator(JobID jobId, TaskSlotState state) { + + Set<AllocationID> allocationIds = slotsPerJob.get(jobId); + + if (allocationIds == null || allocationIds.isEmpty()) { + allSlots = Collections.emptyIterator(); + } else { + allSlots = allocationIds.iterator(); + } + + this.state = Preconditions.checkNotNull(state); + + this.currentSlot = null; + } + + @Override + public boolean hasNext() { + while (currentSlot == null && allSlots.hasNext()) { + AllocationID tempSlot = allSlots.next(); + + TaskSlot taskSlot = getTaskSlot(tempSlot); + + if (taskSlot != null && taskSlot.getState() == state) { + currentSlot = taskSlot; + } + } + + return currentSlot != null; + } + + @Override + public TaskSlot next() { + if (currentSlot != null) { + TaskSlot result = currentSlot; + + currentSlot = null; + + return result; + } else { + while (true) { + AllocationID tempSlot; + + try { + tempSlot = allSlots.next(); + } catch (NoSuchElementException e) { + throw new NoSuchElementException("No more task slots."); + } + + TaskSlot taskSlot = getTaskSlot(tempSlot); + + if (taskSlot != null && taskSlot.getState() == state) { + return taskSlot; + } + } + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Cannot remove task slots via this iterator."); + } + } + + /** + * Iterator over all {@link Task} for a given job + */ + private final class TaskIterator implements Iterator<Task> { + private final Iterator<TaskSlot> taskSlotIterator; + + private Iterator<Task> currentTasks; + + private TaskIterator(JobID jobId) { + this.taskSlotIterator = new TaskSlotIterator(jobId, TaskSlotState.ACTIVE); + + this.currentTasks = null; + } + + @Override + public boolean hasNext() { + while ((currentTasks == null || !currentTasks.hasNext()) && taskSlotIterator.hasNext()) { + TaskSlot taskSlot = taskSlotIterator.next(); + + currentTasks = taskSlot.getTasks(); + } + + return (currentTasks != null && currentTasks.hasNext()); + } + + @Override + public Task next() { + while ((currentTasks == null || !currentTasks.hasNext())) { + TaskSlot taskSlot; + + try { + taskSlot = taskSlotIterator.next(); + } catch (NoSuchElementException e) { + throw new NoSuchElementException("No more tasks."); + } + + currentTasks = taskSlot.getTasks(); + } + + return currentTasks.next(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Cannot remove tasks via this iterator."); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/5f3adc9f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimeoutListener.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimeoutListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimeoutListener.java new file mode 100644 index 0000000..3e75d74 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimeoutListener.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor.slot; + +import java.util.UUID; + +/** + * Listener for timeout events by the {@link TimerService}. + * @param <K> Type of the timeout key + */ +public interface TimeoutListener<K> { + + /** + * Notify the listener about the timeout for an event identified by key. Additionally the method + * is called with the timeout ticket which allows to identify outdated timeout events. + * + * @param key identifying the timed out event + * @param ticket used to check whether the timeout is still valid + */ + void notifyTimeout(K key, UUID ticket); +} http://git-wip-us.apache.org/repos/asf/flink/blob/5f3adc9f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java new file mode 100644 index 0000000..e28e801 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor.slot; + +import org.apache.flink.util.Preconditions; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * Service to register timeouts for a given key. The timeouts are identified by a ticket so that + * newly registered timeouts for the same key can be distinguished from older timeouts. + * + * @param <K> Type of the key + */ +public class TimerService<K> { + + /** Executor service for the scheduled timeouts */ + private final ScheduledExecutorService scheduledExecutorService; + + /** Map of currently active timeouts */ + private final Map<K, Timeout<K>> timeouts; + + /** Listener which is notified about occurring timeouts */ + private TimeoutListener<K> timeoutListener; + + public TimerService(final ScheduledExecutorService scheduledExecutorService) { + this.scheduledExecutorService = Preconditions.checkNotNull(scheduledExecutorService); + + this.timeouts = new HashMap<>(16); + this.timeoutListener = null; + } + + public void start(TimeoutListener<K> initialTimeoutListener) { + // sanity check; We only allow to assign a timeout listener once + Preconditions.checkState(!scheduledExecutorService.isShutdown()); + Preconditions.checkState(timeoutListener == null); + + this.timeoutListener = Preconditions.checkNotNull(initialTimeoutListener); + } + + public void stop() { + for (K key: timeouts.keySet()) { + unregisterTimeout(key); + } + + timeoutListener = null; + + scheduledExecutorService.shutdown(); + } + + /** + * Register a timeout for the given key which shall occur in the given delay. + * + * @param key for which to register the timeout + * @param delay until the timeout + * @param unit of the timeout delay + */ + public void registerTimeout(final K key, final long delay, final TimeUnit unit) { + Preconditions.checkState(timeoutListener != null, "The " + getClass().getSimpleName() + + " has not been started."); + + if (timeouts.containsKey(key)) { + unregisterTimeout(key); + } + + timeouts.put(key, new Timeout<>(timeoutListener, key, delay, unit, scheduledExecutorService)); + } + + /** + * Unregister the timeout for the given key. + * + * @param key for which to unregister the timeout + */ + public void unregisterTimeout(K key) { + Timeout<K> timeout = timeouts.remove(key); + + if (timeout != null) { + timeout.cancel(); + } + } + + /** + * Check whether the timeout for the given key and ticket is still valid (not yet unregistered + * and not yet overwritten). + * + * @param key for which to check the timeout + * @param ticket of the timeout + * @return True if the timeout ticket is still valid; otherwise false + */ + public boolean isValid(K key, UUID ticket) { + if (timeouts.containsKey(key)) { + Timeout<K> timeout = timeouts.get(key); + + return timeout.getTicket().equals(ticket); + } else { + return false; + } + } + + // --------------------------------------------------------------------- + // Static utility classes + // --------------------------------------------------------------------- + + private static final class Timeout<K> implements Runnable { + + private final TimeoutListener<K> timeoutListener; + private final K key; + private final ScheduledFuture<?> scheduledTimeout; + private final UUID ticket; + + Timeout( + final TimeoutListener<K> timeoutListener, + final K key, + final long delay, + final TimeUnit unit, + final ScheduledExecutorService scheduledExecutorService) { + + Preconditions.checkNotNull(scheduledExecutorService); + + this.timeoutListener = Preconditions.checkNotNull(timeoutListener); + this.key = Preconditions.checkNotNull(key); + this.scheduledTimeout = scheduledExecutorService.schedule(this, delay, unit); + this.ticket = UUID.randomUUID(); + } + + UUID getTicket() { + return ticket; + } + + void cancel() { + scheduledTimeout.cancel(true); + } + + @Override + public void run() { + timeoutListener.notifyTimeout(key, ticket); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/5f3adc9f/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 37ac0a3..f16255e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -128,7 +128,7 @@ public class Task implements Runnable, TaskActions { private final ExecutionAttemptID executionId; /** ID which identifies the slot in which the task is supposed to run */ - private final AllocationID allocationID; + private final AllocationID allocationId; /** TaskInfo object for this task */ private final TaskInfo taskInfo; @@ -265,7 +265,7 @@ public class Task implements Runnable, TaskActions { this.jobId = checkNotNull(tdd.getJobID()); this.vertexId = checkNotNull(tdd.getVertexID()); this.executionId = checkNotNull(tdd.getExecutionId()); - this.allocationID = checkNotNull(tdd.getAllocationID()); + this.allocationId = checkNotNull(tdd.getAllocationID()); this.taskNameWithSubtask = taskInfo.getTaskNameWithSubtasks(); this.jobConfiguration = checkNotNull(tdd.getJobConfiguration()); this.taskConfiguration = checkNotNull(tdd.getTaskConfiguration()); @@ -370,8 +370,8 @@ public class Task implements Runnable, TaskActions { return executionId; } - public AllocationID getAllocationID() { - return allocationID; + public AllocationID getAllocationId() { + return allocationId; } public TaskInfo getTaskInfo() { http://git-wip-us.apache.org/repos/asf/flink/blob/5f3adc9f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java index af8aa69..97f42b1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.io.network.partition; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.io.network.api.reader.BufferReader; http://git-wip-us.apache.org/repos/asf/flink/blob/5f3adc9f/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index ecbd9b5..baae251 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -39,6 +39,7 @@ import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequ import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.TestingSerialRpcService; +import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.TestLogger; @@ -83,6 +84,7 @@ public class TaskExecutorTest extends TestLogger { mock(TaskManagerMetricGroup.class), mock(BroadcastVariableManager.class), mock(FileCache.class), + mock(TaskSlotTable.class), mock(FatalErrorHandler.class)); taskManager.start(); @@ -139,6 +141,7 @@ public class TaskExecutorTest extends TestLogger { mock(TaskManagerMetricGroup.class), mock(BroadcastVariableManager.class), mock(FileCache.class), + mock(TaskSlotTable.class), mock(FatalErrorHandler.class)); taskManager.start(); @@ -211,6 +214,7 @@ public class TaskExecutorTest extends TestLogger { mock(TaskManagerMetricGroup.class), mock(BroadcastVariableManager.class), mock(FileCache.class), + mock(TaskSlotTable.class), mock(FatalErrorHandler.class)); taskManager.start();
