Repository: flink Updated Branches: refs/heads/master 8e1775afc -> d6aed38b3
[FLINK-5836] [flip6] Fix race condition between offer slot and submit task Streamline test case This closes #3371. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d6aed38b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d6aed38b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d6aed38b Branch: refs/heads/master Commit: d6aed38b3a15946d383d762030b5f5c1418388de Parents: 8e1775a Author: wenlong.lwl <[email protected]> Authored: Fri Jan 6 16:32:08 2017 +0800 Committer: Till Rohrmann <[email protected]> Committed: Wed Feb 22 14:25:38 2017 +0100 ---------------------------------------------------------------------- .../runtime/taskexecutor/TaskExecutor.java | 35 ++-- .../runtime/taskexecutor/TaskExecutorTest.java | 168 +++++++++++++++++++ 2 files changed, 186 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d6aed38b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 58bbfac..2980376 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -659,7 +659,22 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { final Collection<SlotOffer> reservedSlots = new HashSet<>(2); while (reservedSlotsIterator.hasNext()) { - reservedSlots.add(reservedSlotsIterator.next().generateSlotOffer()); + SlotOffer offer = reservedSlotsIterator.next().generateSlotOffer(); + try { + if (!taskSlotTable.markSlotActive(offer.getAllocationId())) { + // the slot is either free or releasing at the moment + final String message = "Could not mark slot " + jobId + " active."; + log.debug(message); + jobMasterGateway.failSlot(getResourceID(), offer.getAllocationId(), + leaderId, new Exception(message)); + } + } catch (SlotNotFoundException e) { + final String message = "Could not mark slot " + jobId + " active."; + jobMasterGateway.failSlot(getResourceID(), offer.getAllocationId(), + leaderId, new Exception(message)); + continue; + } + reservedSlots.add(offer); } Future<Iterable<SlotOffer>> acceptedSlotsFuture = jobMasterGateway.offerSlots( @@ -674,22 +689,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { // check if the response is still valid if (isJobManagerConnectionValid(jobId, leaderId)) { // mark accepted slots active - for (SlotOffer acceptedSlot: acceptedSlots) { - try { - if (!taskSlotTable.markSlotActive(acceptedSlot.getAllocationId())) { - // the slot is either free or releasing at the moment - final String message = "Could not mark slot " + jobId + " active."; - log.debug(message); - jobMasterGateway.failSlot(getResourceID(), acceptedSlot.getAllocationId(), - leaderId, new Exception(message)); - } - - // remove the assigned slots so that we can free the left overs - reservedSlots.remove(acceptedSlot); - } catch (SlotNotFoundException e) { - log.debug("Could not mark slot {} active.", acceptedSlot, e); - jobMasterGateway.failSlot(getResourceID(), acceptedSlot.getAllocationId(), leaderId, e); - } + for (SlotOffer acceptedSlot : acceptedSlots) { + reservedSlots.remove(acceptedSlot); } final Exception e = new Exception("The slot was rejected by the JobManager."); http://git-wip-us.apache.org/repos/asf/flink/blob/d6aed38b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index aacd329..31bf9b8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -696,4 +696,172 @@ public class TaskExecutorTest extends TestLogger { } } + + /** + * This tests task executor receive SubmitTask before OfferSlot response. + */ + @Test + public void testSubmitTaskBeforeAcceptSlot() throws Exception { + final JobID jobId = new JobID(); + + final TestingSerialRpcService rpc = new TestingSerialRpcService(); + final Configuration configuration = new Configuration(); + final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration); + final ResourceID resourceId = new ResourceID("foobar"); + final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(resourceId, InetAddress.getLoopbackAddress(), 1234); + final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); + final TimerService<AllocationID> timerService = mock(TimerService.class); + final TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(mock(ResourceProfile.class), mock(ResourceProfile.class)), timerService); + final JobManagerTable jobManagerTable = new JobManagerTable(); + final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation); + final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); + + final String resourceManagerAddress = "rm"; + final UUID resourceManagerLeaderId = UUID.randomUUID(); + + final String jobManagerAddress = "jm"; + final UUID jobManagerLeaderId = UUID.randomUUID(); + + final LeaderRetrievalService resourceManagerLeaderRetrievalService = new TestingLeaderRetrievalService(resourceManagerAddress, resourceManagerLeaderId); + final LeaderRetrievalService jobManagerLeaderRetrievalService = new TestingLeaderRetrievalService(jobManagerAddress, jobManagerLeaderId); + haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetrievalService); + haServices.setJobMasterLeaderRetriever(jobId, jobManagerLeaderRetrievalService); + + final ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class); + final InstanceID registrationId = new InstanceID(); + + when(resourceManagerGateway.registerTaskExecutor( + eq(resourceManagerLeaderId), + any(String.class), + eq(resourceId), + any(SlotReport.class), + any(Time.class))).thenReturn( + FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(registrationId, 1000L))); + + final ResourceID jmResourceId = new ResourceID(jobManagerAddress); + final int blobPort = 42; + + final AllocationID allocationId1 = new AllocationID(); + final AllocationID allocationId2 = new AllocationID(); + + final SlotOffer offer1 = new SlotOffer(allocationId1, 0, ResourceProfile.UNKNOWN); + + final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class); + + when(jobMasterGateway.registerTaskManager( + any(String.class), + eq(taskManagerLocation), + eq(jobManagerLeaderId), + any(Time.class) + )).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort))); + when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress); + + + rpc.registerGateway(resourceManagerAddress, resourceManagerGateway); + rpc.registerGateway(jobManagerAddress, jobMasterGateway); + + final LibraryCacheManager libraryCacheManager = mock(LibraryCacheManager.class); + when(libraryCacheManager.getClassLoader(eq(jobId))).thenReturn(getClass().getClassLoader()); + + final JobManagerConnection jobManagerConnection = new JobManagerConnection( + jobMasterGateway, + jobManagerLeaderId, + mock(TaskManagerActions.class), + mock(CheckpointResponder.class), + libraryCacheManager, + mock(ResultPartitionConsumableNotifier.class), + mock(PartitionProducerStateChecker.class)); + + jobManagerTable.put(jobId, jobManagerConnection); + + try { + final TaskExecutor taskManager = new TaskExecutor( + taskManagerConfiguration, + taskManagerLocation, + rpc, + mock(MemoryManager.class), + mock(IOManager.class), + mock(NetworkEnvironment.class), + haServices, + mock(MetricRegistry.class), + mock(TaskManagerMetricGroup.class), + mock(BroadcastVariableManager.class), + mock(FileCache.class), + taskSlotTable, + jobManagerTable, + jobLeaderService, + testingFatalErrorHandler); + taskManager.start(); + taskSlotTable.allocateSlot(0, jobId, allocationId1, Time.milliseconds(10000L)); + taskSlotTable.allocateSlot(1, jobId, allocationId2, Time.milliseconds(10000L)); + + final JobVertexID jobVertexId = new JobVertexID(); + + JobInformation jobInformation = new JobInformation( + jobId, + name.getMethodName(), + new SerializedValue<>(new ExecutionConfig()), + new Configuration(), + Collections.<BlobKey>emptyList(), + Collections.<URL>emptyList()); + + TaskInformation taskInformation = new TaskInformation( + jobVertexId, + "test task", + 1, + 1, + TestInvokable.class.getName(), + new Configuration()); + + SerializedValue<JobInformation> serializedJobInformation = new SerializedValue<>(jobInformation); + SerializedValue<TaskInformation> serializedJobVertexInformation = new SerializedValue<>(taskInformation); + + final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor( + serializedJobInformation, + serializedJobVertexInformation, + new ExecutionAttemptID(), + allocationId1, + 0, + 0, + 0, + null, + Collections.<ResultPartitionDeploymentDescriptor>emptyList(), + Collections.<InputGateDeploymentDescriptor>emptyList()); + + CompletableFuture<Iterable<SlotOffer>> offerResultFuture = new FlinkCompletableFuture<>(); + + // submit task first and then return acceptance response + when( + jobMasterGateway.offerSlots( + any(ResourceID.class), + any(Iterable.class), + eq(jobManagerLeaderId), + any(Time.class))) + .thenReturn(offerResultFuture); + + // we have to add the job after the TaskExecutor, because otherwise the service has not + // been properly started. This will also offer the slots to the job master + jobLeaderService.addJob(jobId, jobManagerAddress); + + verify(jobMasterGateway).offerSlots(any(ResourceID.class), any(Iterable.class), eq(jobManagerLeaderId), any(Time.class)); + + // submit the task without having acknowledge the offered slots + taskManager.submitTask(tdd, jobManagerLeaderId); + + // acknowledge the offered slots + offerResultFuture.complete(Collections.singleton(offer1)); + + verify(resourceManagerGateway).notifySlotAvailable(eq(resourceManagerLeaderId), eq(registrationId), eq(new SlotID(resourceId, 1))); + + assertTrue(taskSlotTable.existsActiveSlot(jobId, allocationId1)); + assertFalse(taskSlotTable.existsActiveSlot(jobId, allocationId2)); + assertTrue(taskSlotTable.isSlotFree(1)); + + // check if a concurrent error occurred + testingFatalErrorHandler.rethrowError(); + } finally { + rpc.stopService(); + } + + } }
