asfgit closed pull request #6780: [FLINK-9932] [runtime] fix slot leak when 
task executor offer slot to job master timeout
URL: https://github.com/apache/flink/pull/6780
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index ae69e561bc7..599bee99da4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -444,11 +444,16 @@ public void start() throws Exception {
                                throw new TaskSubmissionException(message);
                        }
 
-                       if (!taskSlotTable.existsActiveSlot(jobId, 
tdd.getAllocationId())) {
-                               final String message = "No task slot allocated 
for job ID " + jobId +
-                                       " and allocation ID " + 
tdd.getAllocationId() + '.';
-                               log.debug(message);
-                               throw new TaskSubmissionException(message);
+                       try {
+                               if 
(!taskSlotTable.markSlotActive(tdd.getAllocationId()) &&
+                                               
!taskSlotTable.isActive(tdd.getTargetSlotNumber(), tdd.getJobId(), 
tdd.getAllocationId())) {
+                                       final String message = "No task slot 
allocated for job ID " + jobId +
+                                                       " and allocation ID " + 
tdd.getAllocationId() + '.';
+                                       log.debug(message);
+                                       throw new 
TaskSubmissionException(message);
+                               }
+                       } catch (SlotNotFoundException e) {
+                               throw new TaskSubmissionException(e);
                        }
 
                        // re-integrate offloaded data:
@@ -1050,18 +1055,6 @@ private void offerSlotsToJobManager(final JobID jobId) {
 
                                while (reservedSlotsIterator.hasNext()) {
                                        SlotOffer offer = 
reservedSlotsIterator.next().generateSlotOffer();
-                                       try {
-                                               if 
(!taskSlotTable.markSlotActive(offer.getAllocationId())) {
-                                                       // the slot is either 
free or releasing at the moment
-                                                       final String message = 
"Could not mark slot " + jobId + " active.";
-                                                       log.debug(message);
-                                                       
jobMasterGateway.failSlot(getResourceID(), offer.getAllocationId(), new 
Exception(message));
-                                               }
-                                       } catch (SlotNotFoundException e) {
-                                               final String message = "Could 
not mark slot " + jobId + " active.";
-                                               
jobMasterGateway.failSlot(getResourceID(), offer.getAllocationId(), new 
Exception(message));
-                                               continue;
-                                       }
                                        reservedSlots.add(offer);
                                }
 
@@ -1091,7 +1084,20 @@ private void offerSlotsToJobManager(final JobID jobId) {
                                                        if 
(isJobManagerConnectionValid(jobId, jobMasterId)) {
                                                                // mark 
accepted slots active
                                                                for (SlotOffer 
acceptedSlot : acceptedSlots) {
-                                                                       
reservedSlots.remove(acceptedSlot);
+                                                                       try {
+                                                                               
if (!taskSlotTable.markSlotActive(acceptedSlot.getAllocationId()) &&
+                                                                               
                !taskSlotTable.isActive(acceptedSlot.getSlotIndex(), jobId, 
acceptedSlot.getAllocationId())) {
+                                                                               
        // the slot is either free or releasing at the moment
+                                                                               
        final String message = "Could not mark slot " + jobId + " active.";
+                                                                               
        log.debug(message);
+                                                                               
        jobMasterGateway.failSlot(getResourceID(), 
acceptedSlot.getAllocationId(), new Exception(message));
+                                                                               
} else {
+                                                                               
        reservedSlots.remove(acceptedSlot);
+                                                                               
}
+                                                                       } catch 
(SlotNotFoundException e) {
+                                                                               
final String message = "Not find slot " + acceptedSlot.getAllocationId() + " in 
task executor.";
+                                                                               
jobMasterGateway.failSlot(getResourceID(), acceptedSlot.getAllocationId(), new 
Exception(message));
+                                                                       }
                                                                }
 
                                                                final Exception 
e = new Exception("The slot was rejected by the JobManager.");
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
index c94e278fa96..480e22066ba 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -378,6 +378,20 @@ public boolean isAllocated(int index, JobID jobId, 
AllocationID allocationId) {
                return taskSlot.isAllocated(jobId, allocationId);
        }
 
+       /**
+        * Check whether the slot for the given index is active for the given 
job and allocation id.
+        *
+        * @param index of the task slot
+        * @param jobId for which the task slot should be allocated
+        * @param allocationId which should match the task slot's allocation id
+        * @return True if the given task slot is active for the given job and 
allocation id
+        */
+       public boolean isActive(int index, JobID jobId, AllocationID 
allocationId) {
+               TaskSlot taskSlot = taskSlots.get(index);
+
+               return taskSlot.isActive(jobId, allocationId);
+       }
+
        /**
         * Check whether there exists an active slot for the given job and 
allocation id.
         *
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 168b251da26..cfa9039f4aa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -78,6 +78,7 @@
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
@@ -127,6 +128,7 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
 
 import static org.hamcrest.Matchers.contains;
@@ -742,7 +744,7 @@ public void testTaskSubmission() throws Exception {
                jobManagerTable.put(jobId, jobManagerConnection);
 
                final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
-               when(taskSlotTable.existsActiveSlot(eq(jobId), 
eq(allocationId))).thenReturn(true);
+               
when(taskSlotTable.markSlotActive(eq(allocationId))).thenReturn(true);
                when(taskSlotTable.addTask(any(Task.class))).thenReturn(true);
 
                TaskEventDispatcher taskEventDispatcher = new 
TaskEventDispatcher();
@@ -960,7 +962,7 @@ public void testSlotAcceptance() throws Exception {
 
                when(jobMasterGateway.offerSlots(
                                any(ResourceID.class), any(Collection.class), 
any(Time.class)))
-                       
.thenReturn(CompletableFuture.completedFuture((Collection<SlotOffer>)Collections.singleton(offer1)));
+                       
.thenReturn(CompletableFuture.completedFuture((Collection<SlotOffer>) 
Collections.singleton(offer1)));
 
                rpc.registerGateway(resourceManagerAddress, 
resourceManagerGateway);
                rpc.registerGateway(jobManagerAddress, jobMasterGateway);
@@ -1677,6 +1679,87 @@ public void testInitialSlotReportFailure() throws 
Exception {
                }
        }
 
+       /**
+        * Tests that offers slots to job master timeout and retry.
+        */
+       @Test
+       public void testOfferSlotToJobMasterTimeout() throws Exception {
+               final TaskSlotTable taskSlotTable = new TaskSlotTable(
+                               Arrays.asList(ResourceProfile.UNKNOWN, 
ResourceProfile.UNKNOWN),
+                               timerService);
+               final TaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
+               final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
+                               .setTaskSlotTable(taskSlotTable)
+                               .setTaskManagerLocation(taskManagerLocation)
+                               .build();
+               final TaskExecutor taskExecutor = new TaskExecutor(
+                               rpc,
+                               taskManagerConfiguration,
+                               haServices,
+                               taskManagerServices,
+                               new HeartbeatServices(10000, 60000),
+                               
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+                               dummyBlobCacheService,
+                               testingFatalErrorHandler);
+
+               final UUID resourceManagerLeaderId = UUID.randomUUID();
+
+               final String jobManagerAddress = "jm";
+               final UUID jobManagerLeaderId = UUID.randomUUID();
+
+               final AllocationID allocationId = new AllocationID();
+               final SlotOffer offer = new SlotOffer(allocationId, 0, 
ResourceProfile.UNKNOWN);
+
+               final CompletableFuture<ResourceID> initailSlotReportFuture = 
new CompletableFuture<>();
+
+               final TestingResourceManagerGateway 
testingResourceManagerGateway = new TestingResourceManagerGateway();
+               
testingResourceManagerGateway.setSendSlotReportFunction(resourceIDInstanceIDSlotReportTuple3
 -> {
+                       initailSlotReportFuture.complete(null);
+                       return 
CompletableFuture.completedFuture(Acknowledge.get());
+
+               });
+               rpc.registerGateway(testingResourceManagerGateway.getAddress(), 
testingResourceManagerGateway);
+               
resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(),
 resourceManagerLeaderId);
+
+               final ControlledJobMasterGateway jobMasterGateway =
+                               new 
ControlledJobMasterGateway(jobManagerAddress, jobManagerLeaderId, offer);
+               rpc.registerGateway(jobManagerAddress, jobMasterGateway);
+               jobManagerLeaderRetriever.notifyListener(jobManagerAddress, 
jobManagerLeaderId);
+
+               taskExecutor.start();
+
+               try {
+
+                       // wait for the connection to the ResourceManager
+                       initailSlotReportFuture.get();
+
+                       taskExecutor.requestSlot(
+                                       new SlotID(new ResourceID("tm"), 0),
+                                       jobId,
+                                       allocationId,
+                                       jobManagerAddress,
+                                       
ResourceManagerId.fromUuid(resourceManagerLeaderId),
+                                       timeout).get();
+
+                       long startTime = System.currentTimeMillis();
+                       while (2 > jobMasterGateway.getOfferCount()) {
+                               Thread.sleep(100);
+                               if (System.currentTimeMillis() - startTime > 
2000) {
+                                       fail("Didn't Offer slot to job master 
as expected");
+                               }
+                       }
+
+                       assertTrue(taskSlotTable.existsActiveSlot(jobId, 
allocationId));
+                       assertTrue(taskSlotTable.isSlotFree(1));
+                       assertEquals(2, jobMasterGateway.getOfferCount());
+
+                       // check if a concurrent error occurred
+                       testingFatalErrorHandler.rethrowError();
+               } finally {
+                       RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
+               }
+       }
+
        @Nonnull
        private TaskExecutor createTaskExecutor(TaskManagerServices 
taskManagerServices) {
                return new TaskExecutor(
@@ -1786,4 +1869,57 @@ public void monitorTarget(ResourceID resourceID, 
HeartbeatTarget<O> heartbeatTar
                        monitoredTargets.offer(resourceID);
                }
        }
+
+       class ControlledJobMasterGateway extends TestingJobMasterGateway {
+
+               private int index = 0;
+
+               private final String jobManagerAddress;
+
+               private final UUID jobManagerLeaderId;
+
+               private SlotOffer offer;
+
+               ControlledJobMasterGateway(String jobManagerAddress, UUID 
jobManagerLeaderId, SlotOffer slotOffer) {
+                       this.jobManagerAddress = jobManagerAddress;
+                       this.jobManagerLeaderId = jobManagerLeaderId;
+                       this.offer = slotOffer;
+               }
+
+               @Override
+               public CompletableFuture<RegistrationResponse> 
registerTaskManager(
+                               final String taskManagerRpcAddress,
+                               final TaskManagerLocation taskManagerLocation,
+                               @RpcTimeout final Time timeout) {
+                       return CompletableFuture.completedFuture(new 
JMTMRegistrationSuccess(ResourceID.generate()));
+               }
+
+               @Override
+               public String getHostname() {
+                       return jobManagerAddress;
+               }
+
+               @Override
+               public CompletableFuture<Collection<SlotOffer>> 
offerSlots(ResourceID taskManagerId, Collection<SlotOffer> slots, Time timeout) 
{
+                       if (index++ == 0) {
+                               return FutureUtils.completedExceptionally(new 
TimeoutException());
+                       } else {
+                               return 
CompletableFuture.completedFuture(Collections.singleton(offer));
+                       }
+               }
+
+               public int getOfferCount() {
+                       return index;
+               }
+
+               @Override
+               public CompletableFuture<Acknowledge> 
disconnectTaskManager(ResourceID resourceID, Exception cause) {
+                       return 
CompletableFuture.completedFuture(Acknowledge.get());
+               }
+
+               @Override
+               public JobMasterId getFencingToken() {
+                       return new JobMasterId(jobManagerLeaderId);
+               }
+       }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to