asfgit closed pull request #6780: [FLINK-9932] [runtime] fix slot leak when
task executor offer slot to job master timeout
URL: https://github.com/apache/flink/pull/6780
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index ae69e561bc7..599bee99da4 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -444,11 +444,16 @@ public void start() throws Exception {
throw new TaskSubmissionException(message);
}
- if (!taskSlotTable.existsActiveSlot(jobId,
tdd.getAllocationId())) {
- final String message = "No task slot allocated
for job ID " + jobId +
- " and allocation ID " +
tdd.getAllocationId() + '.';
- log.debug(message);
- throw new TaskSubmissionException(message);
+ try {
+ if
(!taskSlotTable.markSlotActive(tdd.getAllocationId()) &&
+
!taskSlotTable.isActive(tdd.getTargetSlotNumber(), tdd.getJobId(),
tdd.getAllocationId())) {
+ final String message = "No task slot
allocated for job ID " + jobId +
+ " and allocation ID " +
tdd.getAllocationId() + '.';
+ log.debug(message);
+ throw new
TaskSubmissionException(message);
+ }
+ } catch (SlotNotFoundException e) {
+ throw new TaskSubmissionException(e);
}
// re-integrate offloaded data:
@@ -1050,18 +1055,6 @@ private void offerSlotsToJobManager(final JobID jobId) {
while (reservedSlotsIterator.hasNext()) {
SlotOffer offer =
reservedSlotsIterator.next().generateSlotOffer();
- try {
- if
(!taskSlotTable.markSlotActive(offer.getAllocationId())) {
- // the slot is either
free or releasing at the moment
- final String message =
"Could not mark slot " + jobId + " active.";
- log.debug(message);
-
jobMasterGateway.failSlot(getResourceID(), offer.getAllocationId(), new
Exception(message));
- }
- } catch (SlotNotFoundException e) {
- final String message = "Could
not mark slot " + jobId + " active.";
-
jobMasterGateway.failSlot(getResourceID(), offer.getAllocationId(), new
Exception(message));
- continue;
- }
reservedSlots.add(offer);
}
@@ -1091,7 +1084,20 @@ private void offerSlotsToJobManager(final JobID jobId) {
if
(isJobManagerConnectionValid(jobId, jobMasterId)) {
// mark
accepted slots active
for (SlotOffer
acceptedSlot : acceptedSlots) {
-
reservedSlots.remove(acceptedSlot);
+ try {
+
if (!taskSlotTable.markSlotActive(acceptedSlot.getAllocationId()) &&
+
!taskSlotTable.isActive(acceptedSlot.getSlotIndex(), jobId,
acceptedSlot.getAllocationId())) {
+
// the slot is either free or releasing at the moment
+
final String message = "Could not mark slot " + jobId + " active.";
+
log.debug(message);
+
jobMasterGateway.failSlot(getResourceID(),
acceptedSlot.getAllocationId(), new Exception(message));
+
} else {
+
reservedSlots.remove(acceptedSlot);
+
}
+ } catch
(SlotNotFoundException e) {
+
final String message = "Not find slot " + acceptedSlot.getAllocationId() + " in
task executor.";
+
jobMasterGateway.failSlot(getResourceID(), acceptedSlot.getAllocationId(), new
Exception(message));
+ }
}
final Exception
e = new Exception("The slot was rejected by the JobManager.");
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
index c94e278fa96..480e22066ba 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -378,6 +378,20 @@ public boolean isAllocated(int index, JobID jobId,
AllocationID allocationId) {
return taskSlot.isAllocated(jobId, allocationId);
}
+ /**
+ * Check whether the slot for the given index is active for the given
job and allocation id.
+ *
+ * @param index of the task slot
+ * @param jobId for which the task slot should be allocated
+ * @param allocationId which should match the task slot's allocation id
+ * @return True if the given task slot is active for the given job and
allocation id
+ */
+ public boolean isActive(int index, JobID jobId, AllocationID
allocationId) {
+ TaskSlot taskSlot = taskSlots.get(index);
+
+ return taskSlot.isActive(jobId, allocationId);
+ }
+
/**
* Check whether there exists an active slot for the given job and
allocation id.
*
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 168b251da26..cfa9039f4aa 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -78,6 +78,7 @@
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
@@ -127,6 +128,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import static org.hamcrest.Matchers.contains;
@@ -742,7 +744,7 @@ public void testTaskSubmission() throws Exception {
jobManagerTable.put(jobId, jobManagerConnection);
final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
- when(taskSlotTable.existsActiveSlot(eq(jobId),
eq(allocationId))).thenReturn(true);
+
when(taskSlotTable.markSlotActive(eq(allocationId))).thenReturn(true);
when(taskSlotTable.addTask(any(Task.class))).thenReturn(true);
TaskEventDispatcher taskEventDispatcher = new
TaskEventDispatcher();
@@ -960,7 +962,7 @@ public void testSlotAcceptance() throws Exception {
when(jobMasterGateway.offerSlots(
any(ResourceID.class), any(Collection.class),
any(Time.class)))
-
.thenReturn(CompletableFuture.completedFuture((Collection<SlotOffer>)Collections.singleton(offer1)));
+
.thenReturn(CompletableFuture.completedFuture((Collection<SlotOffer>)
Collections.singleton(offer1)));
rpc.registerGateway(resourceManagerAddress,
resourceManagerGateway);
rpc.registerGateway(jobManagerAddress, jobMasterGateway);
@@ -1677,6 +1679,87 @@ public void testInitialSlotReportFailure() throws
Exception {
}
}
+ /**
+ * Tests that offers slots to job master timeout and retry.
+ */
+ @Test
+ public void testOfferSlotToJobMasterTimeout() throws Exception {
+ final TaskSlotTable taskSlotTable = new TaskSlotTable(
+ Arrays.asList(ResourceProfile.UNKNOWN,
ResourceProfile.UNKNOWN),
+ timerService);
+ final TaskManagerLocation taskManagerLocation = new
LocalTaskManagerLocation();
+ final TaskManagerServices taskManagerServices = new
TaskManagerServicesBuilder()
+ .setTaskSlotTable(taskSlotTable)
+ .setTaskManagerLocation(taskManagerLocation)
+ .build();
+ final TaskExecutor taskExecutor = new TaskExecutor(
+ rpc,
+ taskManagerConfiguration,
+ haServices,
+ taskManagerServices,
+ new HeartbeatServices(10000, 60000),
+
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+ dummyBlobCacheService,
+ testingFatalErrorHandler);
+
+ final UUID resourceManagerLeaderId = UUID.randomUUID();
+
+ final String jobManagerAddress = "jm";
+ final UUID jobManagerLeaderId = UUID.randomUUID();
+
+ final AllocationID allocationId = new AllocationID();
+ final SlotOffer offer = new SlotOffer(allocationId, 0,
ResourceProfile.UNKNOWN);
+
+ final CompletableFuture<ResourceID> initailSlotReportFuture =
new CompletableFuture<>();
+
+ final TestingResourceManagerGateway
testingResourceManagerGateway = new TestingResourceManagerGateway();
+
testingResourceManagerGateway.setSendSlotReportFunction(resourceIDInstanceIDSlotReportTuple3
-> {
+ initailSlotReportFuture.complete(null);
+ return
CompletableFuture.completedFuture(Acknowledge.get());
+
+ });
+ rpc.registerGateway(testingResourceManagerGateway.getAddress(),
testingResourceManagerGateway);
+
resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(),
resourceManagerLeaderId);
+
+ final ControlledJobMasterGateway jobMasterGateway =
+ new
ControlledJobMasterGateway(jobManagerAddress, jobManagerLeaderId, offer);
+ rpc.registerGateway(jobManagerAddress, jobMasterGateway);
+ jobManagerLeaderRetriever.notifyListener(jobManagerAddress,
jobManagerLeaderId);
+
+ taskExecutor.start();
+
+ try {
+
+ // wait for the connection to the ResourceManager
+ initailSlotReportFuture.get();
+
+ taskExecutor.requestSlot(
+ new SlotID(new ResourceID("tm"), 0),
+ jobId,
+ allocationId,
+ jobManagerAddress,
+
ResourceManagerId.fromUuid(resourceManagerLeaderId),
+ timeout).get();
+
+ long startTime = System.currentTimeMillis();
+ while (2 > jobMasterGateway.getOfferCount()) {
+ Thread.sleep(100);
+ if (System.currentTimeMillis() - startTime >
2000) {
+ fail("Didn't Offer slot to job master
as expected");
+ }
+ }
+
+ assertTrue(taskSlotTable.existsActiveSlot(jobId,
allocationId));
+ assertTrue(taskSlotTable.isSlotFree(1));
+ assertEquals(2, jobMasterGateway.getOfferCount());
+
+ // check if a concurrent error occurred
+ testingFatalErrorHandler.rethrowError();
+ } finally {
+ RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
+ }
+ }
+
@Nonnull
private TaskExecutor createTaskExecutor(TaskManagerServices
taskManagerServices) {
return new TaskExecutor(
@@ -1786,4 +1869,57 @@ public void monitorTarget(ResourceID resourceID,
HeartbeatTarget<O> heartbeatTar
monitoredTargets.offer(resourceID);
}
}
+
+ class ControlledJobMasterGateway extends TestingJobMasterGateway {
+
+ private int index = 0;
+
+ private final String jobManagerAddress;
+
+ private final UUID jobManagerLeaderId;
+
+ private SlotOffer offer;
+
+ ControlledJobMasterGateway(String jobManagerAddress, UUID
jobManagerLeaderId, SlotOffer slotOffer) {
+ this.jobManagerAddress = jobManagerAddress;
+ this.jobManagerLeaderId = jobManagerLeaderId;
+ this.offer = slotOffer;
+ }
+
+ @Override
+ public CompletableFuture<RegistrationResponse>
registerTaskManager(
+ final String taskManagerRpcAddress,
+ final TaskManagerLocation taskManagerLocation,
+ @RpcTimeout final Time timeout) {
+ return CompletableFuture.completedFuture(new
JMTMRegistrationSuccess(ResourceID.generate()));
+ }
+
+ @Override
+ public String getHostname() {
+ return jobManagerAddress;
+ }
+
+ @Override
+ public CompletableFuture<Collection<SlotOffer>>
offerSlots(ResourceID taskManagerId, Collection<SlotOffer> slots, Time timeout)
{
+ if (index++ == 0) {
+ return FutureUtils.completedExceptionally(new
TimeoutException());
+ } else {
+ return
CompletableFuture.completedFuture(Collections.singleton(offer));
+ }
+ }
+
+ public int getOfferCount() {
+ return index;
+ }
+
+ @Override
+ public CompletableFuture<Acknowledge>
disconnectTaskManager(ResourceID resourceID, Exception cause) {
+ return
CompletableFuture.completedFuture(Acknowledge.get());
+ }
+
+ @Override
+ public JobMasterId getFencingToken() {
+ return new JobMasterId(jobManagerLeaderId);
+ }
+ }
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services