shuai-xu commented on a change in pull request #7227: [FLINK-11059] [runtime] 
do not add releasing failed slot to free slots
URL: https://github.com/apache/flink/pull/7227#discussion_r290688424
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
 ##########
 @@ -759,37 +759,43 @@ private void checkIdleSlot() {
                final FlinkException cause = new FlinkException("Releasing idle 
slot.");
 
                for (AllocatedSlot expiredSlot : expiredSlots) {
-                       final AllocationID allocationID = 
expiredSlot.getAllocationId();
-                       if (availableSlots.tryRemove(allocationID) != null) {
-
-                               log.info("Releasing idle slot [{}].", 
allocationID);
-                               final CompletableFuture<Acknowledge> 
freeSlotFuture = expiredSlot.getTaskManagerGateway().freeSlot(
-                                       allocationID,
-                                       cause,
-                                       rpcTimeout);
-
-                               FutureUtils.whenCompleteAsyncIfNotDone(
-                                       freeSlotFuture,
-                                       componentMainThreadExecutor,
-                                       (Acknowledge ignored, Throwable 
throwable) -> {
-                                               if (throwable != null) {
-                                                       if 
(registeredTaskManagers.contains(expiredSlot.getTaskManagerId())) {
-                                                               
log.debug("Releasing slot [{}] of registered TaskExecutor {} failed. " +
-                                                                               
"Trying to fulfill a different slot request.", allocationID, 
expiredSlot.getTaskManagerId(),
-                                                                       
throwable);
-                                                               
tryFulfillSlotRequestOrMakeAvailable(expiredSlot);
-                                                       } else {
-                                                               
log.debug("Releasing slot [{}] failed and owning TaskExecutor {} is no " +
-                                                                       "longer 
registered. Discarding slot.", allocationID, expiredSlot.getTaskManagerId());
-                                                       }
-                                               }
-                                       });
+
+                       if 
(availableSlots.tryRemove(expiredSlot.getAllocationId()) != null) {
+                               releaseSlotToTaskManager(expiredSlot, cause);
                        }
                }
 
                scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout);
        }
 
+       private void releaseSlotToTaskManager(AllocatedSlot expiredSlot, 
FlinkException cause) {
+               final AllocationID allocationID = expiredSlot.getAllocationId();
+               log.info("Releasing idle slot [{}].", allocationID);
+
+               final CompletableFuture<Acknowledge> freeSlotFuture = 
expiredSlot.getTaskManagerGateway().freeSlot(
+                               allocationID,
+                               cause,
+                               rpcTimeout);
+
+               FutureUtils.whenCompleteAsyncIfNotDone(
+                               freeSlotFuture,
+                               componentMainThreadExecutor,
+                               (Acknowledge ignored, Throwable throwable) -> {
+                                       if (throwable != null) {
+                                               if (throwable instanceof 
TimeoutException) {
+                                                       log.debug("Releasing 
slot [{}] of registered TaskExecutor {} timeout. " +
+                                                                               
        "Trying to release it again.",
+                                                                       
allocationID, expiredSlot.getTaskManagerId(), throwable);
+                                                       
releaseSlotToTaskManager(expiredSlot, cause);
 
 Review comment:
   @tillrohrmann , Personally, I prefer retrying to periodically heartbeat for 
status conflict in distributed system. As firstly adding more and more 
informations to heartbeat will heavy the load of master, especially when it has 
thousands of task managers. And most report are not necessary as the conflict 
will not always happen. And secondly, it makes the duration time of conflict 
depends on the interval of heartbeat, if we extend the interval of heartbeat to 
1 minute for example, it means job master may need 1 minute to reconcile wi 
task manager.
   However, if you insist on using heartbeat report, I will change it according 
to your suggestion.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to