pltbkd commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r932469098
##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java:
##########
@@ -182,30 +188,32 @@ private static JobVertexTaskManagersInfo
createJobVertexTaskManagersInfo(
allFinished &= state.isTerminal();
endTime = Math.max(endTime, vertex.getStateTimestamp(state));
- counts.addIOMetrics(
- vertex.getCurrentExecutionAttempt(),
- metricFetcher,
- jobID.toString(),
- jobVertex.getJobVertexId().toString());
- MutableIOMetrics current = new MutableIOMetrics();
- current.addIOMetrics(
- vertex.getCurrentExecutionAttempt(),
- metricFetcher,
- jobID.toString(),
- jobVertex.getJobVertexId().toString());
- ioMetricsInfos.add(
- new IOMetricsInfo(
- current.getNumBytesIn(),
- current.isNumBytesInComplete(),
- current.getNumBytesOut(),
- current.isNumBytesOutComplete(),
- current.getNumRecordsIn(),
- current.isNumRecordsInComplete(),
- current.getNumRecordsOut(),
- current.isNumRecordsOutComplete(),
- current.getAccumulateBackPressuredTime(),
- current.getAccumulateIdleTime(),
- current.getAccumulateBusyTime()));
+ for (AccessExecution attempt : vertex.getCurrentExecutions()) {
Review Comment:
You are right. While when I was trying to do a further refactor of this
handler, I found that maybe we should use only the representing execution to
calculate the aggregated task manager status, or the state can be confusing
since if one attempt is CANCELED (even it's a speculative one), the state of
the task manager is CANCELED. I don't think this is the expected behavior. I'd
revert the changes and add a comment to describe why we choose to use the
representing attempt only.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java:
##########
@@ -99,25 +99,27 @@ public Collection<ArchivedJson>
archiveJsonWithPath(AccessExecutionGraph graph)
List<ArchivedJson> archive = new ArrayList<>(16);
for (AccessExecutionJobVertex task : graph.getAllVertices().values()) {
for (AccessExecutionVertex subtask : task.getTaskVertices()) {
- ResponseBody curAttemptJson =
-
createAccumulatorInfo(subtask.getCurrentExecutionAttempt());
- String curAttemptPath =
- getMessageHeaders()
- .getTargetRestEndpointURL()
- .replace(':' + JobIDPathParameter.KEY,
graph.getJobID().toString())
- .replace(
- ':' + JobVertexIdPathParameter.KEY,
- task.getJobVertexId().toString())
- .replace(
- ':' + SubtaskIndexPathParameter.KEY,
-
String.valueOf(subtask.getParallelSubtaskIndex()))
- .replace(
- ':' + SubtaskAttemptPathParameter.KEY,
- String.valueOf(
-
subtask.getCurrentExecutionAttempt()
- .getAttemptNumber()));
-
- archive.add(new ArchivedJson(curAttemptPath, curAttemptJson));
+ for (AccessExecution attempt : subtask.getCurrentExecutions())
{
+ if (attempt != null) {
Review Comment:
It refers to the HistoricalExecutions part, but indeed it's redundant since
the attempt should never be null. I'll remove the check.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java:
##########
@@ -257,18 +258,19 @@ private Map<TaskManagerLocation,
ImmutableSet<ExecutionAttemptID>> groupExecutio
executionVertex.getExecutionState());
continue;
}
- TaskManagerLocation tmLocation =
executionVertex.getCurrentAssignedResourceLocation();
- if (tmLocation == null) {
- LOG.trace("ExecutionVertex {} is currently not assigned",
executionVertex);
- continue;
- }
- Set<ExecutionAttemptID> groupedAttemptIds =
- executionAttemptsByLocation.getOrDefault(tmLocation, new
HashSet<>());
+ for (AccessExecution execution :
executionVertex.getCurrentExecutions()) {
Review Comment:
The ExecutionAttemptID stands for a specific attempt of a specific
execution. It seems the further processing doesn't care if there are
ExecutionAttemptIDs belonging to the same execution. It processes each
ExecutionAttemptID independently.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java:
##########
@@ -100,26 +106,73 @@ private JobVertexBackPressureInfo
createJobVertexBackPressureInfo(
}
private List<SubtaskBackPressureInfo> createSubtaskBackPressureInfo(
- Map<Integer, ComponentMetricStore> subtaskMetricStores) {
+ TaskMetricStore taskMetricStore, Map<Integer, Integer>
currentExecutionAttempts) {
+ Map<Integer, ComponentMetricStore> subtaskMetricStores =
+ taskMetricStore.getAllSubtaskMetricStores();
List<SubtaskBackPressureInfo> result = new
ArrayList<>(subtaskMetricStores.size());
for (Map.Entry<Integer, ComponentMetricStore> entry :
subtaskMetricStores.entrySet()) {
int subtaskIndex = entry.getKey();
- ComponentMetricStore subtaskMetricStore = entry.getValue();
- double backPressureRatio =
getBackPressureRatio(subtaskMetricStore);
- double idleRatio = getIdleRatio(subtaskMetricStore);
- double busyRatio = getBusyRatio(subtaskMetricStore);
- result.add(
- new SubtaskBackPressureInfo(
- subtaskIndex,
- getBackPressureLevel(backPressureRatio),
- backPressureRatio,
- idleRatio,
- busyRatio));
+ SubtaskMetricStore subtaskMetricStore =
+ taskMetricStore.getSubtaskMetricStore(subtaskIndex);
+ Map<Integer, ComponentMetricStore> allAttemptsMetricStores =
+ subtaskMetricStore.getAllAttemptsMetricStores();
+ if (allAttemptsMetricStores.isEmpty() ||
allAttemptsMetricStores.size() == 1) {
+ result.add(
+ createSubtaskAttemptBackpressureInfo(
+ subtaskIndex, null, subtaskMetricStore, null));
+ } else {
+ int currentAttempt =
+ currentExecutionAttempts == null
+ ? -1
+ :
currentExecutionAttempts.getOrDefault(subtaskIndex, -1);
+ if (!allAttemptsMetricStores.containsKey(currentAttempt)) {
+ // allAttemptsMetricStores is not empty here
+ currentAttempt =
allAttemptsMetricStores.keySet().iterator().next();
+ }
+ List<SubtaskBackPressureInfo> otherConcurrentAttempts =
+ new ArrayList<>(allAttemptsMetricStores.size() - 1);
+ for (Map.Entry<Integer, ComponentMetricStore> attemptStore :
+ allAttemptsMetricStores.entrySet()) {
+ if (attemptStore.getKey() == currentAttempt) {
+ continue;
+ }
+ otherConcurrentAttempts.add(
+ createSubtaskAttemptBackpressureInfo(
+ subtaskIndex,
+ attemptStore.getKey(),
+ attemptStore.getValue(),
+ null));
+ }
+ result.add(
+ createSubtaskAttemptBackpressureInfo(
+ subtaskIndex,
+ currentAttempt,
+ allAttemptsMetricStores.get(currentAttempt),
+ otherConcurrentAttempts));
+ }
}
result.sort(Comparator.comparingInt(SubtaskBackPressureInfo::getSubtask));
Review Comment:
Yes it is since there's only one element in the result for each subtask.
Other attempts are in the otherConcurrentAttempts.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java:
##########
@@ -100,26 +106,73 @@ private JobVertexBackPressureInfo
createJobVertexBackPressureInfo(
}
private List<SubtaskBackPressureInfo> createSubtaskBackPressureInfo(
- Map<Integer, ComponentMetricStore> subtaskMetricStores) {
+ TaskMetricStore taskMetricStore, Map<Integer, Integer>
currentExecutionAttempts) {
+ Map<Integer, ComponentMetricStore> subtaskMetricStores =
+ taskMetricStore.getAllSubtaskMetricStores();
List<SubtaskBackPressureInfo> result = new
ArrayList<>(subtaskMetricStores.size());
for (Map.Entry<Integer, ComponentMetricStore> entry :
subtaskMetricStores.entrySet()) {
int subtaskIndex = entry.getKey();
- ComponentMetricStore subtaskMetricStore = entry.getValue();
- double backPressureRatio =
getBackPressureRatio(subtaskMetricStore);
- double idleRatio = getIdleRatio(subtaskMetricStore);
- double busyRatio = getBusyRatio(subtaskMetricStore);
- result.add(
- new SubtaskBackPressureInfo(
- subtaskIndex,
- getBackPressureLevel(backPressureRatio),
- backPressureRatio,
- idleRatio,
- busyRatio));
+ SubtaskMetricStore subtaskMetricStore =
+ taskMetricStore.getSubtaskMetricStore(subtaskIndex);
+ Map<Integer, ComponentMetricStore> allAttemptsMetricStores =
+ subtaskMetricStore.getAllAttemptsMetricStores();
+ if (allAttemptsMetricStores.isEmpty() ||
allAttemptsMetricStores.size() == 1) {
+ result.add(
+ createSubtaskAttemptBackpressureInfo(
+ subtaskIndex, null, subtaskMetricStore, null));
+ } else {
+ int currentAttempt =
+ currentExecutionAttempts == null
Review Comment:
currentExecutionAttempts is acquired from the JobDetails. In fact the
currentExecutionAttempts of a subtask only exists when there's more than one
current attempt. So it's normal that currentExecutionAttempts is null.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]