shuai-xu commented on a change in pull request #7227: [FLINK-11059] [runtime]
do not add releasing failed slot to free slots
URL: https://github.com/apache/flink/pull/7227#discussion_r293256279
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
##########
@@ -1420,6 +1424,41 @@ private void timeoutSlot(AllocationID allocationId,
UUID ticket) {
}
}
+ /**
+ * Sync slot with job master.
+ * The slots only exist in job master will be failed to job master.
+ * The slots only exist in task executor will be released to resource
manager.
+ *
+ * @param slotSnapshot is the snapshot of slots in job master
+ */
+ private void syncSlotWithSnapshotFromJobMaster(SlotSnapshot
slotSnapshot) {
+ JobManagerConnection jobManagerConnection =
jobManagerTable.get(slotSnapshot.getJobId());
+ if (jobManagerConnection != null) {
+ JobMasterGateway jobMasterGateway =
jobManagerConnection.getJobManagerGateway();
+
+ for (OfferedSlot offeredSlot : slotSnapshot.getSlots())
{
+ AllocationID allocationId =
offeredSlot.getAllocationId();
+ if
(!taskSlotTable.isAllocated(offeredSlot.getSlotIndex(),
slotSnapshot.getJobId(), allocationId)) {
+ jobMasterGateway.failSlot(
+ getResourceID(),
+ allocationId,
+ new Exception("Slot " +
offeredSlot.getSlotIndex() + " is not allocated by " + allocationId));
+ }
+ }
+ }
+
+ Set<AllocationID> diffWithJobMaster = new HashSet<>();
+ Iterator<AllocationID> slotsTaskManagerSide =
taskSlotTable.getActiveSlots(slotSnapshot.getJobId());
+ while (slotsTaskManagerSide.hasNext()) {
+ AllocationID allocationId = slotsTaskManagerSide.next();
+ if (!slotSnapshot.contains(allocationId)) {
+ diffWithJobMaster.add(allocationId);
+ }
+ }
+ diffWithJobMaster.forEach(
+ (allocationId) ->
freeSlotInternal(allocationId, new Exception(allocationId + " not exists in job
master.")));
Review comment:
Good point
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services