asfgit closed pull request #6780: [FLINK-9932] [runtime] fix slot leak when task executor offer slot to job master timeout URL: https://github.com/apache/flink/pull/6780
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index ae69e561bc7..599bee99da4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -444,11 +444,16 @@ public void start() throws Exception { throw new TaskSubmissionException(message); } - if (!taskSlotTable.existsActiveSlot(jobId, tdd.getAllocationId())) { - final String message = "No task slot allocated for job ID " + jobId + - " and allocation ID " + tdd.getAllocationId() + '.'; - log.debug(message); - throw new TaskSubmissionException(message); + try { + if (!taskSlotTable.markSlotActive(tdd.getAllocationId()) && + !taskSlotTable.isActive(tdd.getTargetSlotNumber(), tdd.getJobId(), tdd.getAllocationId())) { + final String message = "No task slot allocated for job ID " + jobId + + " and allocation ID " + tdd.getAllocationId() + '.'; + log.debug(message); + throw new TaskSubmissionException(message); + } + } catch (SlotNotFoundException e) { + throw new TaskSubmissionException(e); } // re-integrate offloaded data: @@ -1050,18 +1055,6 @@ private void offerSlotsToJobManager(final JobID jobId) { while (reservedSlotsIterator.hasNext()) { SlotOffer offer = reservedSlotsIterator.next().generateSlotOffer(); - try { - if (!taskSlotTable.markSlotActive(offer.getAllocationId())) { - // the slot is either free or releasing at the moment - final String message = "Could not mark slot " + jobId + " active."; - log.debug(message); - jobMasterGateway.failSlot(getResourceID(), offer.getAllocationId(), new Exception(message)); - } - } catch (SlotNotFoundException e) { - final String message = "Could not mark slot " + jobId + " active."; - jobMasterGateway.failSlot(getResourceID(), offer.getAllocationId(), new Exception(message)); - continue; - } reservedSlots.add(offer); } @@ -1091,7 +1084,20 @@ private void offerSlotsToJobManager(final JobID jobId) { if (isJobManagerConnectionValid(jobId, jobMasterId)) { // mark accepted slots active for (SlotOffer acceptedSlot : acceptedSlots) { - reservedSlots.remove(acceptedSlot); + try { + if (!taskSlotTable.markSlotActive(acceptedSlot.getAllocationId()) && + !taskSlotTable.isActive(acceptedSlot.getSlotIndex(), jobId, acceptedSlot.getAllocationId())) { + // the slot is either free or releasing at the moment + final String message = "Could not mark slot " + jobId + " active."; + log.debug(message); + jobMasterGateway.failSlot(getResourceID(), acceptedSlot.getAllocationId(), new Exception(message)); + } else { + reservedSlots.remove(acceptedSlot); + } + } catch (SlotNotFoundException e) { + final String message = "Not find slot " + acceptedSlot.getAllocationId() + " in task executor."; + jobMasterGateway.failSlot(getResourceID(), acceptedSlot.getAllocationId(), new Exception(message)); + } } final Exception e = new Exception("The slot was rejected by the JobManager."); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java index c94e278fa96..480e22066ba 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java @@ -378,6 +378,20 @@ public boolean isAllocated(int index, JobID jobId, AllocationID allocationId) { return taskSlot.isAllocated(jobId, allocationId); } + /** + * Check whether the slot for the given index is active for the given job and allocation id. + * + * @param index of the task slot + * @param jobId for which the task slot should be allocated + * @param allocationId which should match the task slot's allocation id + * @return True if the given task slot is active for the given job and allocation id + */ + public boolean isActive(int index, JobID jobId, AllocationID allocationId) { + TaskSlot taskSlot = taskSlots.get(index); + + return taskSlot.isActive(jobId, allocationId); + } + /** * Check whether there exists an active slot for the given job and allocation id. * diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index 168b251da26..cfa9039f4aa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -78,6 +78,7 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager; @@ -127,6 +128,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.Function; import static org.hamcrest.Matchers.contains; @@ -742,7 +744,7 @@ public void testTaskSubmission() throws Exception { jobManagerTable.put(jobId, jobManagerConnection); final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class); - when(taskSlotTable.existsActiveSlot(eq(jobId), eq(allocationId))).thenReturn(true); + when(taskSlotTable.markSlotActive(eq(allocationId))).thenReturn(true); when(taskSlotTable.addTask(any(Task.class))).thenReturn(true); TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher(); @@ -960,7 +962,7 @@ public void testSlotAcceptance() throws Exception { when(jobMasterGateway.offerSlots( any(ResourceID.class), any(Collection.class), any(Time.class))) - .thenReturn(CompletableFuture.completedFuture((Collection<SlotOffer>)Collections.singleton(offer1))); + .thenReturn(CompletableFuture.completedFuture((Collection<SlotOffer>) Collections.singleton(offer1))); rpc.registerGateway(resourceManagerAddress, resourceManagerGateway); rpc.registerGateway(jobManagerAddress, jobMasterGateway); @@ -1677,6 +1679,87 @@ public void testInitialSlotReportFailure() throws Exception { } } + /** + * Tests that offers slots to job master timeout and retry. + */ + @Test + public void testOfferSlotToJobMasterTimeout() throws Exception { + final TaskSlotTable taskSlotTable = new TaskSlotTable( + Arrays.asList(ResourceProfile.UNKNOWN, ResourceProfile.UNKNOWN), + timerService); + final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); + final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() + .setTaskSlotTable(taskSlotTable) + .setTaskManagerLocation(taskManagerLocation) + .build(); + final TaskExecutor taskExecutor = new TaskExecutor( + rpc, + taskManagerConfiguration, + haServices, + taskManagerServices, + new HeartbeatServices(10000, 60000), + UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), + dummyBlobCacheService, + testingFatalErrorHandler); + + final UUID resourceManagerLeaderId = UUID.randomUUID(); + + final String jobManagerAddress = "jm"; + final UUID jobManagerLeaderId = UUID.randomUUID(); + + final AllocationID allocationId = new AllocationID(); + final SlotOffer offer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN); + + final CompletableFuture<ResourceID> initailSlotReportFuture = new CompletableFuture<>(); + + final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway(); + testingResourceManagerGateway.setSendSlotReportFunction(resourceIDInstanceIDSlotReportTuple3 -> { + initailSlotReportFuture.complete(null); + return CompletableFuture.completedFuture(Acknowledge.get()); + + }); + rpc.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway); + resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), resourceManagerLeaderId); + + final ControlledJobMasterGateway jobMasterGateway = + new ControlledJobMasterGateway(jobManagerAddress, jobManagerLeaderId, offer); + rpc.registerGateway(jobManagerAddress, jobMasterGateway); + jobManagerLeaderRetriever.notifyListener(jobManagerAddress, jobManagerLeaderId); + + taskExecutor.start(); + + try { + + // wait for the connection to the ResourceManager + initailSlotReportFuture.get(); + + taskExecutor.requestSlot( + new SlotID(new ResourceID("tm"), 0), + jobId, + allocationId, + jobManagerAddress, + ResourceManagerId.fromUuid(resourceManagerLeaderId), + timeout).get(); + + long startTime = System.currentTimeMillis(); + while (2 > jobMasterGateway.getOfferCount()) { + Thread.sleep(100); + if (System.currentTimeMillis() - startTime > 2000) { + fail("Didn't Offer slot to job master as expected"); + } + } + + assertTrue(taskSlotTable.existsActiveSlot(jobId, allocationId)); + assertTrue(taskSlotTable.isSlotFree(1)); + assertEquals(2, jobMasterGateway.getOfferCount()); + + // check if a concurrent error occurred + testingFatalErrorHandler.rethrowError(); + } finally { + RpcUtils.terminateRpcEndpoint(taskExecutor, timeout); + } + } + @Nonnull private TaskExecutor createTaskExecutor(TaskManagerServices taskManagerServices) { return new TaskExecutor( @@ -1786,4 +1869,57 @@ public void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> heartbeatTar monitoredTargets.offer(resourceID); } } + + class ControlledJobMasterGateway extends TestingJobMasterGateway { + + private int index = 0; + + private final String jobManagerAddress; + + private final UUID jobManagerLeaderId; + + private SlotOffer offer; + + ControlledJobMasterGateway(String jobManagerAddress, UUID jobManagerLeaderId, SlotOffer slotOffer) { + this.jobManagerAddress = jobManagerAddress; + this.jobManagerLeaderId = jobManagerLeaderId; + this.offer = slotOffer; + } + + @Override + public CompletableFuture<RegistrationResponse> registerTaskManager( + final String taskManagerRpcAddress, + final TaskManagerLocation taskManagerLocation, + @RpcTimeout final Time timeout) { + return CompletableFuture.completedFuture(new JMTMRegistrationSuccess(ResourceID.generate())); + } + + @Override + public String getHostname() { + return jobManagerAddress; + } + + @Override + public CompletableFuture<Collection<SlotOffer>> offerSlots(ResourceID taskManagerId, Collection<SlotOffer> slots, Time timeout) { + if (index++ == 0) { + return FutureUtils.completedExceptionally(new TimeoutException()); + } else { + return CompletableFuture.completedFuture(Collections.singleton(offer)); + } + } + + public int getOfferCount() { + return index; + } + + @Override + public CompletableFuture<Acknowledge> disconnectTaskManager(ResourceID resourceID, Exception cause) { + return CompletableFuture.completedFuture(Acknowledge.get()); + } + + @Override + public JobMasterId getFencingToken() { + return new JobMasterId(jobManagerLeaderId); + } + } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services