This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new dc08df61329 [FLINK-32168] RM logs missing and current resources
dc08df61329 is described below
commit dc08df6132921dec0083ebf5a54a66f8447aa2c8
Author: Chesnay Schepler <[email protected]>
AuthorDate: Tue Jul 25 18:41:39 2023 +0200
[FLINK-32168] RM logs missing and current resources
---
.../slotmanager/FineGrainedSlotManager.java | 29 +++++++++++++++++++++-
1 file changed, 28 insertions(+), 1 deletion(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
index bca62e4f6df..772bcc8acdb 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
@@ -58,6 +58,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
@@ -628,7 +629,8 @@ public class FineGrainedSlotManager implements SlotManager {
return;
}
- LOG.info("Matching resource requirements against available
resources.");
+ logMissingAndAvailableResource(missingResources);
+
missingResources =
missingResources.entrySet().stream()
.collect(
@@ -682,6 +684,31 @@ public class FineGrainedSlotManager implements SlotManager
{
}
}
+ private void logMissingAndAvailableResource(
+ Map<JobID, Collection<ResourceRequirement>> missingResources) {
+ final StringJoiner lines = new StringJoiner(System.lineSeparator());
+ lines.add("Matching resource requirements against available
resources.");
+ lines.add("Missing resources:");
+ missingResources.forEach(
+ (jobId, resources) -> {
+ lines.add("\t Job " + jobId);
+ resources.forEach(resource ->
lines.add(String.format("\t\t%s", resource)));
+ });
+ lines.add("Current resources:");
+ if (taskManagerTracker.getRegisteredTaskManagers().isEmpty()) {
+ lines.add("\t(none)");
+ } else {
+ for (TaskManagerInfo taskManager :
taskManagerTracker.getRegisteredTaskManagers()) {
+ final ResourceID resourceId =
+
taskManager.getTaskExecutorConnection().getResourceID();
+ lines.add("\tTaskManager " + resourceId);
+ lines.add("\t\tAvailable: " +
taskManager.getAvailableResource());
+ lines.add("\t\tTotal: " + taskManager.getTotalResource());
+ }
+ }
+ LOG.info(lines.toString());
+ }
+
private void allocateSlotsAccordingTo(Map<JobID, Map<InstanceID,
ResourceCounter>> result) {
final List<CompletableFuture<Void>> allocationFutures = new
ArrayList<>();
for (Map.Entry<JobID, Map<InstanceID, ResourceCounter>> jobEntry :
result.entrySet()) {