Repository: flink Updated Branches: refs/heads/master 0e20b6130 -> e94a488dd
[FLINK-8504] [flip6] Deregister jobs from the JobLeaderService when no more slots allocated Let the TaskExecutor deregister jobs from the JobLeaderService once it has no more slots for this job allocated. This closes #5361. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e94a488d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e94a488d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e94a488d Branch: refs/heads/master Commit: e94a488dd78e7c2efdf55a67cea886ee15a641a6 Parents: 23ff120 Author: Till Rohrmann <[email protected]> Authored: Thu Jan 25 13:50:43 2018 +0100 Committer: Till Rohrmann <[email protected]> Committed: Tue Jan 30 18:30:17 2018 +0100 ---------------------------------------------------------------------- .../runtime/taskexecutor/JobLeaderService.java | 40 +++--- .../runtime/taskexecutor/TaskExecutor.java | 80 ++++++------ .../taskexecutor/slot/TaskSlotTable.java | 45 ++++++- .../runtime/taskexecutor/TaskExecutorTest.java | 121 ++++++++++++++++++- 4 files changed, 223 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e94a488d/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java index 3b4da4e..5376362 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.taskexecutor; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; @@ -37,11 +38,11 @@ import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; /** @@ -80,7 +81,9 @@ public class JobLeaderService { public JobLeaderService(TaskManagerLocation location) { this.ownLocation = Preconditions.checkNotNull(location); - jobLeaderServices = new HashMap<>(4); + // Has to be a concurrent hash map because tests might access this service + // concurrently via containsJob + jobLeaderServices = new ConcurrentHashMap<>(4); state = JobLeaderService.State.CREATED; @@ -147,18 +150,6 @@ public class JobLeaderService { } /** - * Check whether the service monitors the given job. - * - * @param jobId identifying the job - * @return True if the given job is monitored; otherwise false - */ - public boolean containsJob(JobID jobId) { - Preconditions.checkState(JobLeaderService.State.STARTED == state, "The service is currently not running."); - - return jobLeaderServices.containsKey(jobId); - } - - /** * Remove the given job from being monitored by the job leader service. * * @param jobId identifying the job to remove from monitoring @@ -199,9 +190,9 @@ public class JobLeaderService { JobLeaderService.JobManagerLeaderListener jobManagerLeaderListener = new JobManagerLeaderListener(jobId); - leaderRetrievalService.start(jobManagerLeaderListener); - jobLeaderServices.put(jobId, Tuple2.of(leaderRetrievalService, jobManagerLeaderListener)); + + leaderRetrievalService.start(jobManagerLeaderListener); } /** @@ -435,4 +426,21 @@ public class JobLeaderService { private enum State { CREATED, STARTED, STOPPED } + + // ----------------------------------------------------------- + // Testing methods + // ----------------------------------------------------------- + + /** + * Check whether the service monitors the given job. + * + * @param jobId identifying the job + * @return True if the given job is monitored; otherwise false + */ + @VisibleForTesting + public boolean containsJob(JobID jobId) { + Preconditions.checkState(JobLeaderService.State.STARTED == state, "The service is currently not running."); + + return jobLeaderServices.containsKey(jobId); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/e94a488d/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index ad7414c..9df2e88 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -93,7 +93,6 @@ import org.apache.flink.runtime.taskmanager.TaskManagerActions; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; -import org.apache.flink.util.Preconditions; import java.io.IOException; import java.net.InetSocketAddress; @@ -163,7 +162,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { // --------- job manager connections ----------- - private Map<ResourceID, JobManagerConnection> jobManagerConnections; + private final Map<ResourceID, JobManagerConnection> jobManagerConnections; // --------- task slot allocation table ----------- @@ -195,7 +194,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { JobLeaderService jobLeaderService, FatalErrorHandler fatalErrorHandler) { - super(rpcService, AkkaRpcServiceUtils.createRandomName(TaskExecutor.TASK_MANAGER_NAME)); + super(rpcService, AkkaRpcServiceUtils.createRandomName(TASK_MANAGER_NAME)); checkArgument(taskManagerConfiguration.getNumberSlots() > 0, "The number of slots has to be larger than 0."); @@ -978,10 +977,10 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { ResourceID resourceID, JobMasterGateway jobMasterGateway, int blobPort) { - Preconditions.checkNotNull(jobID); - Preconditions.checkNotNull(resourceID); - Preconditions.checkNotNull(jobMasterGateway); - Preconditions.checkArgument(blobPort > 0 && blobPort < MAX_BLOB_PORT, "Blob server port is out of range."); + checkNotNull(jobID); + checkNotNull(resourceID); + checkNotNull(jobMasterGateway); + checkArgument(blobPort > 0 && blobPort < MAX_BLOB_PORT, "Blob server port is out of range."); TaskManagerActions taskManagerActions = new TaskManagerActionsImpl(jobMasterGateway); @@ -1029,7 +1028,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { } private void disassociateFromJobManager(JobManagerConnection jobManagerConnection, Exception cause) throws IOException { - Preconditions.checkNotNull(jobManagerConnection); + checkNotNull(jobManagerConnection); JobMasterGateway jobManagerGateway = jobManagerConnection.getJobManagerGateway(); jobManagerGateway.disconnectTaskManager(getResourceID(), cause); jobManagerConnection.getLibraryCacheManager().shutdown(); @@ -1104,36 +1103,40 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { } private void freeSlotInternal(AllocationID allocationId, Throwable cause) { - Preconditions.checkNotNull(allocationId); + checkNotNull(allocationId); try { - TaskSlot taskSlot = taskSlotTable.freeSlot(allocationId, cause); + final JobID jobId = taskSlotTable.getOwningJob(allocationId); - if (taskSlot != null && isConnectedToResourceManager()) { - // the slot was freed. Tell the RM about it - ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway(); + final int slotIndex = taskSlotTable.freeSlot(allocationId, cause); - resourceManagerGateway.notifySlotAvailable( - resourceManagerConnection.getRegistrationId(), - new SlotID(getResourceID(), taskSlot.getIndex()), - allocationId); + if (slotIndex != -1) { - // check whether we still have allocated slots for the same job - final JobID jobId = taskSlot.getJobId(); - final Iterator<Task> tasks = taskSlotTable.getTasks(jobId); + if (isConnectedToResourceManager()) { + // the slot was freed. Tell the RM about it + ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway(); - if (!tasks.hasNext()) { - // we can remove the job from the job leader service - try { - jobLeaderService.removeJob(jobId); - } catch (Exception e) { - log.info("Could not remove job {} from JobLeaderService.", jobId, e); - } + resourceManagerGateway.notifySlotAvailable( + resourceManagerConnection.getRegistrationId(), + new SlotID(getResourceID(), slotIndex), + allocationId); + } - closeJobManagerConnection( - jobId, - new FlinkException("TaskExecutor " + getAddress() + - " has no more allocated slots for job " + jobId + '.')); + if (jobId != null) { + // check whether we still have allocated slots for the same job + if (taskSlotTable.getAllocationIdsPerJob(jobId).isEmpty()) { + // we can remove the job from the job leader service + try { + jobLeaderService.removeJob(jobId); + } catch (Exception e) { + log.info("Could not remove job {} from JobLeaderService.", jobId, e); + } + + closeJobManagerConnection( + jobId, + new FlinkException("TaskExecutor " + getAddress() + + " has no more allocated slots for job " + jobId + '.')); + } } } } catch (SlotNotFoundException e) { @@ -1141,13 +1144,9 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { } } - private void freeSlotInternal(AllocationID allocationId) { - freeSlotInternal(allocationId, new Exception("The slot " + allocationId + " is being freed.")); - } - private void timeoutSlot(AllocationID allocationId, UUID ticket) { - Preconditions.checkNotNull(allocationId); - Preconditions.checkNotNull(ticket); + checkNotNull(allocationId); + checkNotNull(ticket); if (taskSlotTable.isValidTimeout(allocationId, ticket)) { freeSlotInternal(allocationId, new Exception("The slot " + allocationId + " has timed out.")); @@ -1285,7 +1284,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { private final JobMasterGateway jobMasterGateway; private TaskManagerActionsImpl(JobMasterGateway jobMasterGateway) { - this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway); + this.jobMasterGateway = checkNotNull(jobMasterGateway); } @Override @@ -1318,7 +1317,10 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { @Override public void freeSlot(final AllocationID allocationId) { - runAsync(() -> TaskExecutor.this.freeSlotInternal(allocationId)); + runAsync(() -> + freeSlotInternal( + allocationId, + new FlinkException("TaskSlotTable requested freeing the TaskSlot " + allocationId + '.'))); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/e94a488d/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java index fcb2761..f8f9164 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java @@ -133,6 +133,22 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> { slotActions = null; } + /** + * Returns the all {@link AllocationID} for the given job. + * + * @param jobId for which to return the set of {@link AllocationID}. + * @return Set of {@link AllocationID} for the given job + */ + public Set<AllocationID> getAllocationIdsPerJob(JobID jobId) { + final Set<AllocationID> allocationIds = slotsPerJob.get(jobId); + + if (allocationIds == null) { + return Collections.emptySet(); + } else { + return Collections.unmodifiableSet(allocationIds); + } + } + // --------------------------------------------------------------------- // Slot report methods // --------------------------------------------------------------------- @@ -268,7 +284,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> { * @throws SlotNotFoundException if there is not task slot for the given allocation id * @return Index of the freed slot if the slot could be freed; otherwise -1 */ - public TaskSlot freeSlot(AllocationID allocationId) throws SlotNotFoundException { + public int freeSlot(AllocationID allocationId) throws SlotNotFoundException { return freeSlot(allocationId, new Exception("The task slot of this task is being freed.")); } @@ -282,8 +298,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> { * @throws SlotNotFoundException if there is not task slot for the given allocation id * @return The freed TaskSlot. If the TaskSlot cannot be freed then null. */ - @Nullable - public TaskSlot freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException { + public int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException { checkInit(); TaskSlot taskSlot = getTaskSlot(allocationId); @@ -317,7 +332,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> { slotsPerJob.remove(jobId); } - return taskSlot; + return taskSlot.getIndex(); } else { // we couldn't free the task slot because it still contains task, fail the tasks // and set the slot state to releasing so that it gets eventually freed @@ -329,7 +344,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> { taskIterator.next().failExternally(cause); } - return null; + return -1; } } else { throw new SlotNotFoundException(allocationId); @@ -422,6 +437,25 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> { return new AllocationIDIterator(jobId, TaskSlotState.ACTIVE); } + /** + * Returns the owning job of the {@link TaskSlot} identified by the + * given {@link AllocationID}. + * + * @param allocationId identifying the slot for which to retrieve the owning job + * @return Owning job of the specified {@link TaskSlot} or null if there is no slot for + * the given allocation id or if the slot has no owning job assigned + */ + @Nullable + public JobID getOwningJob(AllocationID allocationId) { + final TaskSlot taskSlot = getTaskSlot(allocationId); + + if (taskSlot != null) { + return taskSlot.getJobId(); + } else { + return null; + } + } + // --------------------------------------------------------------------- // Task methods // --------------------------------------------------------------------- @@ -538,6 +572,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> { // Internal methods // --------------------------------------------------------------------- + @Nullable private TaskSlot getTaskSlot(AllocationID allocationId) { Preconditions.checkNotNull(allocationId); http://git-wip-us.apache.org/repos/asf/flink/blob/e94a488d/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index 0c3adae..efd27f5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -61,12 +61,14 @@ import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess; import org.apache.flink.runtime.jobmaster.JobMasterGateway; import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.ResourceManagerId; @@ -89,6 +91,7 @@ import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.util.TestingFatalErrorHandler; import org.apache.flink.testutils.category.Flip6; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; @@ -106,7 +109,6 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.slf4j.Logger; -import java.io.File; import java.net.InetAddress; import java.util.Arrays; import java.util.Collection; @@ -122,6 +124,7 @@ import java.util.concurrent.TimeoutException; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -143,7 +146,6 @@ import static org.mockito.Mockito.when; public class TaskExecutorTest extends TestLogger { private final Time timeout = Time.milliseconds(10000L); - private final File tempDir = new File(System.getProperty("java.io.tmpdir")); private TimerService<AllocationID> timerService; @@ -1034,7 +1036,6 @@ public class TaskExecutorTest extends TestLogger { mock(NetworkEnvironment.class), haServices, mock(HeartbeatServices.class, RETURNS_MOCKS), - mock(TaskManagerMetricGroup.class), mock(BroadcastVariableManager.class), mock(FileCache.class), @@ -1544,6 +1545,120 @@ public class TaskExecutorTest extends TestLogger { } /** + * Tests that a job is removed from the JobLeaderService once a TaskExecutor has + * no more slots assigned to this job. + * + * <p>See FLINK-8504 + */ + @Test + public void testRemoveJobFromJobLeaderService() throws Exception { + final Configuration configuration = new Configuration(); + final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration); + final LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation(); + final JobLeaderService jobLeaderService = new JobLeaderService(localTaskManagerLocation); + final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); + final TaskSlotTable taskSlotTable = new TaskSlotTable( + Collections.singleton(ResourceProfile.UNKNOWN), + timerService); + + final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); + + final TestingLeaderRetrievalService resourceManagerLeaderRetriever = new TestingLeaderRetrievalService(); + haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever); + + final TaskExecutor taskExecutor = new TaskExecutor( + rpc, + taskManagerConfiguration, + localTaskManagerLocation, + mock(MemoryManager.class), + mock(IOManager.class), + new TaskExecutorLocalStateStoresManager(), + mock(NetworkEnvironment.class), + haServices, + new HeartbeatServices(1000L, 1000L), + UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), + new BroadcastVariableManager(), + mock(FileCache.class), + taskSlotTable, + new JobManagerTable(), + jobLeaderService, + testingFatalErrorHandler); + + try { + final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); + final ResourceManagerId resourceManagerId = resourceManagerGateway.getFencingToken(); + + rpc.registerGateway(resourceManagerGateway.getAddress(), resourceManagerGateway); + resourceManagerLeaderRetriever.notifyListener(resourceManagerGateway.getAddress(), resourceManagerId.toUUID()); + + final JobID jobId = new JobID(); + + final CompletableFuture<LeaderRetrievalListener> startFuture = new CompletableFuture<>(); + final CompletableFuture<Void> stopFuture = new CompletableFuture<>(); + + final StartStopNotifyingLeaderRetrievalService jobMasterLeaderRetriever = new StartStopNotifyingLeaderRetrievalService( + startFuture, + stopFuture); + haServices.setJobMasterLeaderRetriever(jobId, jobMasterLeaderRetriever); + + taskExecutor.start(); + + final TaskExecutorGateway taskExecutorGateway = taskExecutor.getSelfGateway(TaskExecutorGateway.class); + + final SlotID slotId = new SlotID(localTaskManagerLocation.getResourceID(), 0); + final AllocationID allocationId = new AllocationID(); + + assertThat(startFuture.isDone(), is(false)); + assertThat(jobLeaderService.containsJob(jobId), is(false)); + + taskExecutorGateway.requestSlot( + slotId, + jobId, + allocationId, + "foobar", + resourceManagerId, + timeout).get(); + + // wait until the job leader retrieval service for jobId is started + startFuture.get(); + assertThat(jobLeaderService.containsJob(jobId), is(true)); + + taskExecutorGateway.freeSlot(allocationId, new FlinkException("Test exception"), timeout).get(); + + // wait that the job leader retrieval service for jobId stopped becaue it should get removed + stopFuture.get(); + assertThat(jobLeaderService.containsJob(jobId), is(false)); + + testingFatalErrorHandler.rethrowError(); + } finally { + RpcUtils.terminateRpcEndpoint(taskExecutor, timeout); + } + } + + private static final class StartStopNotifyingLeaderRetrievalService implements LeaderRetrievalService { + private final CompletableFuture<LeaderRetrievalListener> startFuture; + + private final CompletableFuture<Void> stopFuture; + + private StartStopNotifyingLeaderRetrievalService( + CompletableFuture<LeaderRetrievalListener> startFuture, + CompletableFuture<Void> stopFuture) { + this.startFuture = startFuture; + this.stopFuture = stopFuture; + } + + @Override + public void start(LeaderRetrievalListener listener) throws Exception { + startFuture.complete(listener); + } + + @Override + public void stop() throws Exception { + stopFuture.complete(null); + } + } + + /** * Special {@link HeartbeatServices} which creates a {@link RecordingHeartbeatManagerImpl}. */ private static final class RecordingHeartbeatServices extends HeartbeatServices {
