gaoyunhaii commented on code in PR #20733:
URL: https://github.com/apache/flink/pull/20733#discussion_r961403379
##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -82,18 +83,52 @@ synchronized void retainTaskManagers(List<String>
activeTaskManagers) {
*/
synchronized void retainJobs(List<String> activeJobs) {
jobs.keySet().retainAll(activeJobs);
- currentExecutionAttempts.keySet().retainAll(activeJobs);
+ representativeAttempts.keySet().retainAll(activeJobs);
}
public synchronized void
updateCurrentExecutionAttempts(Collection<JobDetails> jobs) {
- jobs.forEach(
- job ->
- currentExecutionAttempts.put(
- job.getJobId().toString(),
job.getCurrentExecutionAttempts()));
+ for (JobDetails job : jobs) {
+ String jobId = job.getJobId().toString();
+ Map<String, Map<Integer, List<Integer>>> currentExecutionAttempts =
+ job.getCurrentExecutionAttempts();
+ Map<String, Map<Integer, Integer>> jobRepresentativeAttempts =
+ new HashMap<>(currentExecutionAttempts.size());
+ for (Map.Entry<String, Map<Integer, List<Integer>>> vertexEntry :
+ currentExecutionAttempts.entrySet()) {
+ String vertexId = vertexEntry.getKey();
+ Map<Integer, List<Integer>> subtaskAttempts =
vertexEntry.getValue();
+ Map<Integer, Integer> vertexRepresentativeAttempts =
+ new HashMap<>(subtaskAttempts.size());
+ for (Map.Entry<Integer, List<Integer>> e :
subtaskAttempts.entrySet()) {
+ int subtaskIndex = e.getKey();
+ List<Integer> currentAttemptNumbers = e.getValue();
+ retainSubtaskAttemptMetrics(
+ jobId, vertexId, subtaskIndex,
currentAttemptNumbers);
+ if (!currentAttemptNumbers.isEmpty()) {
+ vertexRepresentativeAttempts.put(
+ subtaskIndex, currentAttemptNumbers.get(0));
+ }
+ }
+ if (!vertexRepresentativeAttempts.isEmpty()) {
+ jobRepresentativeAttempts.put(vertexId,
vertexRepresentativeAttempts);
+ }
+ }
+ representativeAttempts.put(jobId, jobRepresentativeAttempts);
+ }
+ }
+
+ private void retainSubtaskAttemptMetrics(
+ String jobId, String vertexId, int subtaskIndex, List<Integer>
currentAttempts) {
+ Map<Integer, ComponentMetricStore> attemptMetrics =
Review Comment:
We might move this method into SubtaskMetricStore so that we do not expose
attemptMetrics and modify it outside of its containing class.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -82,18 +83,52 @@ synchronized void retainTaskManagers(List<String>
activeTaskManagers) {
*/
synchronized void retainJobs(List<String> activeJobs) {
jobs.keySet().retainAll(activeJobs);
- currentExecutionAttempts.keySet().retainAll(activeJobs);
+ representativeAttempts.keySet().retainAll(activeJobs);
}
public synchronized void
updateCurrentExecutionAttempts(Collection<JobDetails> jobs) {
- jobs.forEach(
- job ->
- currentExecutionAttempts.put(
- job.getJobId().toString(),
job.getCurrentExecutionAttempts()));
+ for (JobDetails job : jobs) {
+ String jobId = job.getJobId().toString();
+ Map<String, Map<Integer, List<Integer>>> currentExecutionAttempts =
+ job.getCurrentExecutionAttempts();
+ Map<String, Map<Integer, Integer>> jobRepresentativeAttempts =
+ new HashMap<>(currentExecutionAttempts.size());
+ for (Map.Entry<String, Map<Integer, List<Integer>>> vertexEntry :
+ currentExecutionAttempts.entrySet()) {
+ String vertexId = vertexEntry.getKey();
+ Map<Integer, List<Integer>> subtaskAttempts =
vertexEntry.getValue();
+ Map<Integer, Integer> vertexRepresentativeAttempts =
Review Comment:
We may simplify the computation to be
```
Map<Integer, Integer> vertexRepresentativeAttempts =
vertexEntry.getValue().entrySet().stream()
.filter(entry -> !entry.getValue().isEmpty())
.collect(
Collectors.toMap(
Map.Entry::getKey,
entry ->
entry.getValue().get(0)));
```
So that we could save some Entry declaration, which makes the code a bit
complex.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -82,18 +83,52 @@ synchronized void retainTaskManagers(List<String>
activeTaskManagers) {
*/
synchronized void retainJobs(List<String> activeJobs) {
jobs.keySet().retainAll(activeJobs);
- currentExecutionAttempts.keySet().retainAll(activeJobs);
+ representativeAttempts.keySet().retainAll(activeJobs);
}
public synchronized void
updateCurrentExecutionAttempts(Collection<JobDetails> jobs) {
- jobs.forEach(
- job ->
- currentExecutionAttempts.put(
- job.getJobId().toString(),
job.getCurrentExecutionAttempts()));
+ for (JobDetails job : jobs) {
+ String jobId = job.getJobId().toString();
+ Map<String, Map<Integer, List<Integer>>> currentExecutionAttempts =
+ job.getCurrentExecutionAttempts();
+ Map<String, Map<Integer, Integer>> jobRepresentativeAttempts =
+ new HashMap<>(currentExecutionAttempts.size());
+ for (Map.Entry<String, Map<Integer, List<Integer>>> vertexEntry :
+ currentExecutionAttempts.entrySet()) {
+ String vertexId = vertexEntry.getKey();
+ Map<Integer, List<Integer>> subtaskAttempts =
vertexEntry.getValue();
+ Map<Integer, Integer> vertexRepresentativeAttempts =
+ new HashMap<>(subtaskAttempts.size());
+ for (Map.Entry<Integer, List<Integer>> e :
subtaskAttempts.entrySet()) {
+ int subtaskIndex = e.getKey();
+ List<Integer> currentAttemptNumbers = e.getValue();
+ retainSubtaskAttemptMetrics(
Review Comment:
I'm a bit tend to we separate this step into a dedicated iteration:
```
// retain subtask attempt metrics
job.getCurrentExecutionAttempts()
.forEach(
(vertexId, subtaskAttempts) -> {
subtaskAttempts.forEach(
(subtaskIndex, attempts) ->
retainSubtaskAttemptMetrics(
job.getJobId().toString(),
vertexId,
subtaskIndex,
attempts));
});
// Updates representative attempts
...
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java:
##########
@@ -84,10 +85,13 @@ public class JobDetails implements Serializable {
/**
* The map holds the attempt number of the current execution attempt in
the Execution, which is
* considered as the representing execution for the subtask of the vertex.
The keys and values
- * are JobVertexID -> SubtaskIndex -> CurrentExecutionAttemptNumber. It is
used to accumulate
- * the metrics of a subtask in MetricFetcher.
+ * are JobVertexID -> SubtaskIndex -> CurrentExecutionAttemptNumbers. The
first element in the
+ * CurrentExecutionAttemptNumbers is of the representative attempt.
+ *
+ * <p>The field is excluded from the json. Any usage from the web UI and
the history server is
+ * not allowed.
*/
- private final Map<String, Map<Integer, Integer>> currentExecutionAttempts;
+ private final Map<String, Map<Integer, List<Integer>>>
currentExecutionAttempts;
Review Comment:
We may change the List to an explicit structure. It could be defined inside
the `JobDetails`
It might be sth like
```java
public static final class CurrentAttempts {
public final int representativeAttempts;
public final List<Integer> currentAttempts;
public CurrentAttempts(int representativeAttempts, List<Integer>
currentAttempts) {
this.representativeAttempts = representativeAttempts;
this.currentAttempts =
Collections.unmodifiableList(currentAttempts);
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]