gaoyunhaii commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r931890536


##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersion.java:
##########
@@ -47,6 +48,9 @@ public ClusterOverviewWithVersion(
             @JsonProperty(FIELD_NAME_TASKMANAGERS) int 
numTaskManagersConnected,
             @JsonProperty(FIELD_NAME_SLOTS_TOTAL) int numSlotsTotal,
             @JsonProperty(FIELD_NAME_SLOTS_AVAILABLE) int numSlotsAvailable,
+            @JsonProperty(FIELD_NAME_TASKMANAGERS_BLOCKED) @Nullable Integer 
numTaskManagersBlocked,

Review Comment:
   Similarly could we directly use `int` here?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java:
##########
@@ -79,6 +96,11 @@ public ArchivedExecution getCurrentExecutionAttempt() {
         return currentExecution;
     }
 
+    @Override
+    public Collection<AccessExecution> getCurrentExecutions() {
+        return Collections.unmodifiableCollection(currentExecutions);

Review Comment:
   In consideration of the call times for this method I'm a bit tend to the 
current executions maintained directly via an unmodified collection. 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java:
##########
@@ -40,15 +44,27 @@ public class ArchivedExecutionVertex implements 
AccessExecutionVertex, Serializa
 
     private final ArchivedExecution currentExecution; // this field must never 
be null
 
+    private final Collection<AccessExecution> currentExecutions;
+
     // ------------------------------------------------------------------------
 
     public ArchivedExecutionVertex(ExecutionVertex vertex) {
         this.subTaskIndex = vertex.getParallelSubtaskIndex();
         this.executionHistory = getCopyOfExecutionHistory(vertex);
         this.taskNameWithSubtask = vertex.getTaskNameWithSubtaskIndex();
-        this.currentExecution = vertex.getCurrentExecutionAttempt().archive();
+
+        Execution vertexCurrentExecution = vertex.getCurrentExecutionAttempt();
+        currentExecutions = new 
ArrayList<>(vertex.getCurrentExecutions().size());
+        currentExecution = vertexCurrentExecution.archive();
+        currentExecutions.add(currentExecution);
+        for (Execution execution : vertex.getCurrentExecutions()) {

Review Comment:
   It seems we should not exclude the representing one from the list from the 
other part of the code?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java:
##########
@@ -685,15 +688,31 @@ public CompletableFuture<TaskManagerInfoWithSlots> 
requestTaskManagerDetailsInfo
     @Override
     public CompletableFuture<ResourceOverview> requestResourceOverview(Time 
timeout) {
         final int numberSlots = slotManager.getNumberRegisteredSlots();
-        final int numberFreeSlots = slotManager.getNumberFreeSlots();
         final ResourceProfile totalResource = 
slotManager.getRegisteredResource();
-        final ResourceProfile freeResource = slotManager.getFreeResource();
 
+        int numberFreeSlots = slotManager.getNumberFreeSlots();

Review Comment:
   Might skip the added checks if `blocklistHandlergetAllBlockedNodeIds() == 0` 
?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java:
##########
@@ -44,11 +50,22 @@ public class ClusterOverview extends JobsOverview {
     @JsonProperty(FIELD_NAME_SLOTS_AVAILABLE)
     private final int numSlotsAvailable;
 
+    @JsonProperty(FIELD_NAME_TASKMANAGERS_BLOCKED)
+    @JsonInclude(Include.NON_DEFAULT)
+    private final int numTaskManagersBlocked;
+
+    @JsonProperty(FIELD_NAME_SLOTS_FREE_AND_BLOCKED)
+    @JsonInclude(Include.NON_DEFAULT)
+    private final int numSlotsFreeAndBlocked;
+
     @JsonCreator
     public ClusterOverview(
             @JsonProperty(FIELD_NAME_TASKMANAGERS) int 
numTaskManagersConnected,
             @JsonProperty(FIELD_NAME_SLOTS_TOTAL) int numSlotsTotal,
             @JsonProperty(FIELD_NAME_SLOTS_AVAILABLE) int numSlotsAvailable,
+            @JsonProperty(FIELD_NAME_TASKMANAGERS_BLOCKED) @Nullable Integer 
numTaskManagersBlocked,

Review Comment:
   If we have already use `int` for the field, why cannot we also use `int` for 
the parameter? It seems we also not have situations that passes null actually. 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java:
##########
@@ -271,6 +324,21 @@ public void serialize(
 
             jsonGenerator.writeEndObject();
 
+            if (jobDetails.currentExecutionAttempts != null

Review Comment:
   It seems `currentExecutionAttempts` cannot be null ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to