This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 037687c [hotfix][runtime] Log slot pool status if unable to fulfill
job requirements
037687c is described below
commit 037687c2ea1f1b81cc77b14c11ad529b30da4db5
Author: Roman Khachatryan <[email protected]>
AuthorDate: Mon Jul 12 02:27:47 2021 +0200
[hotfix][runtime] Log slot pool status if unable to fulfill job requirements
---
.../runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java | 9 ++++++---
.../runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java | 8 ++++++++
.../resourcemanager/slotmanager/DeclarativeSlotManager.java | 5 ++++-
3 files changed, 18 insertions(+), 4 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
index 1e9a2b4..c9dafd2 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
@@ -348,14 +348,17 @@ public class DeclarativeSlotPoolBridge extends
DeclarativeSlotPoolService implem
Collection<ResourceRequirement> acquiredResources) {
assertRunningInMainThread();
- failPendingRequests();
+ failPendingRequests(acquiredResources);
}
- private void failPendingRequests() {
+ private void failPendingRequests(Collection<ResourceRequirement>
acquiredResources) {
if (!pendingRequests.isEmpty()) {
final NoResourceAvailableException cause =
new NoResourceAvailableException(
- "Could not acquire the minimum required
resources.");
+ "Could not acquire the minimum required resources.
Acquired: "
+ + acquiredResources
+ + ". Current slot pool status: "
+ + getSlotServiceStatus());
cancelPendingRequests(
request -> !isBatchSlotRequestTimeoutCheckDisabled ||
!request.isBatchRequest(),
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java
index 2c19471..ec1a970 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java
@@ -306,4 +306,12 @@ public class DeclarativeSlotPoolService implements
SlotPoolService {
STARTED,
CLOSED,
}
+
+ protected String getSlotServiceStatus() {
+ return String.format(
+ "Registered TMs: %d, registered slots: %d free slots: %d",
+ registeredTaskManagers.size(),
+ declarativeSlotPool.getAllSlotsInformation().size(),
+ declarativeSlotPool.getFreeSlotsInformation().size());
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
index ab8e155..aa8238f 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
@@ -647,7 +647,10 @@ public class DeclarativeSlotManager implements SlotManager
{
pendingSlots = allocationResult.getNewAvailableResources();
if (!allocationResult.isSuccessfulAllocating()
&& sendNotEnoughResourceNotifications) {
- LOG.warn("Could not fulfill resource requirements of
job {}.", jobId);
+ LOG.warn(
+ "Could not fulfill resource requirements of
job {}. Free slots: {}",
+ jobId,
+ slotTracker.getFreeSlots().size());
resourceActions.notifyNotEnoughResourcesAvailable(
jobId,
resourceTracker.getAcquiredResources(jobId));
return pendingSlots;