tillrohrmann commented on a change in pull request #16457:
URL: https://github.com/apache/flink/pull/16457#discussion_r667791324



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
##########
@@ -647,7 +647,12 @@ private ResourceCounter 
tryFulfillRequirementsWithPendingSlots(
                     pendingSlots = allocationResult.getNewAvailableResources();
                     if (!allocationResult.isSuccessfulAllocating()
                             && sendNotEnoughResourceNotifications) {
-                        LOG.warn("Could not fulfill resource requirements of 
job {}.", jobId);
+                        // TODO (review): free slots are logged as zero here, 
but non-zero in
+                        // JobMaster.slotPoolService

Review comment:
       Can we resolve this TODO? I don't really understand what the problem is.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
##########
@@ -348,14 +348,17 @@ public void notifyNotEnoughResourcesAvailable(
             Collection<ResourceRequirement> acquiredResources) {
         assertRunningInMainThread();
 
-        failPendingRequests();
+        failPendingRequests(acquiredResources);
     }
 
-    private void failPendingRequests() {
+    private void failPendingRequests(Collection<ResourceRequirement> 
acquiredResources) {
         if (!pendingRequests.isEmpty()) {
             final NoResourceAvailableException cause =
                     new NoResourceAvailableException(
-                            "Could not acquire the minimum required 
resources.");
+                            "Could not acquire the minimum required resources. 
Acquired: "
+                                    + acquiredResources
+                                    + ". Current slot service status: "

Review comment:
       ```suggestion
                                       + ". Current slot pool status: "
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java
##########
@@ -306,4 +306,12 @@ public AllocatedSlotReport 
createAllocatedSlotReport(ResourceID taskManagerId) {
         STARTED,
         CLOSED,
     }
+
+    protected String getSlotServiceStatus() {
+        return String.format(
+                "registered TMs: %d, registered slots: %d free slots: %d",

Review comment:
       ```suggestion
                   "Registered TMs: %d, registered slots: %d free slots: %d",
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
##########
@@ -647,7 +647,12 @@ private ResourceCounter 
tryFulfillRequirementsWithPendingSlots(
                     pendingSlots = allocationResult.getNewAvailableResources();
                     if (!allocationResult.isSuccessfulAllocating()
                             && sendNotEnoughResourceNotifications) {
-                        LOG.warn("Could not fulfill resource requirements of 
job {}.", jobId);
+                        // TODO (review): free slots are logged as zero here, 
but non-zero in
+                        // JobMaster.slotPoolService

Review comment:
       This log statement happens on the `ResourceManager`. The other log 
statement happens on the `JobMaster`. It could be the case that the RM has 
assigned all its slots to the JM (e.g. the RM has 0 free slots) but the JM 
hasn't assigned them yet.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to