This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 037687c  [hotfix][runtime] Log slot pool status if unable to fulfill 
job requirements
037687c is described below

commit 037687c2ea1f1b81cc77b14c11ad529b30da4db5
Author: Roman Khachatryan <[email protected]>
AuthorDate: Mon Jul 12 02:27:47 2021 +0200

    [hotfix][runtime] Log slot pool status if unable to fulfill job requirements
---
 .../runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java    | 9 ++++++---
 .../runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java   | 8 ++++++++
 .../resourcemanager/slotmanager/DeclarativeSlotManager.java      | 5 ++++-
 3 files changed, 18 insertions(+), 4 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
index 1e9a2b4..c9dafd2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
@@ -348,14 +348,17 @@ public class DeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolService implem
             Collection<ResourceRequirement> acquiredResources) {
         assertRunningInMainThread();
 
-        failPendingRequests();
+        failPendingRequests(acquiredResources);
     }
 
-    private void failPendingRequests() {
+    private void failPendingRequests(Collection<ResourceRequirement> 
acquiredResources) {
         if (!pendingRequests.isEmpty()) {
             final NoResourceAvailableException cause =
                     new NoResourceAvailableException(
-                            "Could not acquire the minimum required 
resources.");
+                            "Could not acquire the minimum required resources. 
Acquired: "
+                                    + acquiredResources
+                                    + ". Current slot pool status: "
+                                    + getSlotServiceStatus());
 
             cancelPendingRequests(
                     request -> !isBatchSlotRequestTimeoutCheckDisabled || 
!request.isBatchRequest(),
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java
index 2c19471..ec1a970 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java
@@ -306,4 +306,12 @@ public class DeclarativeSlotPoolService implements 
SlotPoolService {
         STARTED,
         CLOSED,
     }
+
+    protected String getSlotServiceStatus() {
+        return String.format(
+                "Registered TMs: %d, registered slots: %d free slots: %d",
+                registeredTaskManagers.size(),
+                declarativeSlotPool.getAllSlotsInformation().size(),
+                declarativeSlotPool.getFreeSlotsInformation().size());
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
index ab8e155..aa8238f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
@@ -647,7 +647,10 @@ public class DeclarativeSlotManager implements SlotManager 
{
                     pendingSlots = allocationResult.getNewAvailableResources();
                     if (!allocationResult.isSuccessfulAllocating()
                             && sendNotEnoughResourceNotifications) {
-                        LOG.warn("Could not fulfill resource requirements of 
job {}.", jobId);
+                        LOG.warn(
+                                "Could not fulfill resource requirements of 
job {}. Free slots: {}",
+                                jobId,
+                                slotTracker.getFreeSlots().size());
                         resourceActions.notifyNotEnoughResourcesAvailable(
                                 jobId, 
resourceTracker.getAcquiredResources(jobId));
                         return pendingSlots;

Reply via email to