zhuzhurk commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r932010754
##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java:
##########
@@ -131,18 +132,23 @@ private static JobVertexTaskManagersInfo
createJobVertexTaskManagersInfo(
Map<String, String> taskManagerId2Host = new HashMap<>();
Map<String, List<AccessExecutionVertex>> taskManagerVertices = new
HashMap<>();
for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
- TaskManagerLocation location =
vertex.getCurrentAssignedResourceLocation();
- String taskManagerHost =
- location == null
- ? "(unassigned)"
- : location.getHostname() + ':' +
location.dataPort();
- String taskmanagerId =
- location == null ? "(unassigned)" :
location.getResourceID().toString();
- taskManagerId2Host.put(taskmanagerId, taskManagerHost);
- List<AccessExecutionVertex> vertices =
- taskManagerVertices.computeIfAbsent(
- taskmanagerId, ignored -> new ArrayList<>(4));
- vertices.add(vertex);
+ List<TaskManagerLocation> locations =
+ vertex.getCurrentExecutions().stream()
+ .map(AccessExecution::getAssignedResourceLocation)
+ .collect(Collectors.toList());
+ for (TaskManagerLocation location : locations) {
+ String taskManagerHost =
+ location == null
+ ? "(unassigned)"
+ : location.getHostname() + ':' +
location.dataPort();
+ String taskmanagerId =
+ location == null ? "(unassigned)" :
location.getResourceID().toString();
+ taskManagerId2Host.put(taskmanagerId, taskManagerHost);
+ List<AccessExecutionVertex> vertices =
+ taskManagerVertices.computeIfAbsent(
+ taskmanagerId, ignored -> new ArrayList<>(4));
Review Comment:
Better to avoid such a magic-number initial capacity (although it was added
previously).
##########
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java:
##########
@@ -257,18 +258,19 @@ private Map<TaskManagerLocation,
ImmutableSet<ExecutionAttemptID>> groupExecutio
executionVertex.getExecutionState());
continue;
}
- TaskManagerLocation tmLocation =
executionVertex.getCurrentAssignedResourceLocation();
- if (tmLocation == null) {
- LOG.trace("ExecutionVertex {} is currently not assigned",
executionVertex);
- continue;
- }
- Set<ExecutionAttemptID> groupedAttemptIds =
- executionAttemptsByLocation.getOrDefault(tmLocation, new
HashSet<>());
+ for (AccessExecution execution :
executionVertex.getCurrentExecutions()) {
Review Comment:
To double confirm, would any problem occur if one subtask has multiple
attempts?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java:
##########
@@ -111,36 +111,44 @@ public Collection<ArchivedJson>
archiveJsonWithPath(AccessExecutionGraph graph)
List<ArchivedJson> archive = new ArrayList<>(16);
for (AccessExecutionJobVertex task : graph.getAllVertices().values()) {
for (AccessExecutionVertex subtask : task.getTaskVertices()) {
- ResponseBody curAttemptJson =
- SubtaskExecutionAttemptDetailsInfo.create(
- subtask.getCurrentExecutionAttempt(),
- null,
- graph.getJobID(),
- task.getJobVertexId());
- String curAttemptPath =
- getMessageHeaders()
- .getTargetRestEndpointURL()
- .replace(':' + JobIDPathParameter.KEY,
graph.getJobID().toString())
- .replace(
- ':' + JobVertexIdPathParameter.KEY,
- task.getJobVertexId().toString())
- .replace(
- ':' + SubtaskIndexPathParameter.KEY,
-
String.valueOf(subtask.getParallelSubtaskIndex()))
- .replace(
- ':' + SubtaskAttemptPathParameter.KEY,
- String.valueOf(
-
subtask.getCurrentExecutionAttempt()
- .getAttemptNumber()));
-
- archive.add(new ArchivedJson(curAttemptPath, curAttemptJson));
+ for (AccessExecution attempt : subtask.getCurrentExecutions())
{
+ if (attempt != null) {
Review Comment:
In which case attempt can be null?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java:
##########
@@ -100,26 +106,73 @@ private JobVertexBackPressureInfo
createJobVertexBackPressureInfo(
}
private List<SubtaskBackPressureInfo> createSubtaskBackPressureInfo(
- Map<Integer, ComponentMetricStore> subtaskMetricStores) {
+ TaskMetricStore taskMetricStore, Map<Integer, Integer>
currentExecutionAttempts) {
+ Map<Integer, ComponentMetricStore> subtaskMetricStores =
+ taskMetricStore.getAllSubtaskMetricStores();
List<SubtaskBackPressureInfo> result = new
ArrayList<>(subtaskMetricStores.size());
for (Map.Entry<Integer, ComponentMetricStore> entry :
subtaskMetricStores.entrySet()) {
int subtaskIndex = entry.getKey();
- ComponentMetricStore subtaskMetricStore = entry.getValue();
- double backPressureRatio =
getBackPressureRatio(subtaskMetricStore);
- double idleRatio = getIdleRatio(subtaskMetricStore);
- double busyRatio = getBusyRatio(subtaskMetricStore);
- result.add(
- new SubtaskBackPressureInfo(
- subtaskIndex,
- getBackPressureLevel(backPressureRatio),
- backPressureRatio,
- idleRatio,
- busyRatio));
+ SubtaskMetricStore subtaskMetricStore =
+ taskMetricStore.getSubtaskMetricStore(subtaskIndex);
+ Map<Integer, ComponentMetricStore> allAttemptsMetricStores =
+ subtaskMetricStore.getAllAttemptsMetricStores();
+ if (allAttemptsMetricStores.isEmpty() ||
allAttemptsMetricStores.size() == 1) {
+ result.add(
+ createSubtaskAttemptBackpressureInfo(
+ subtaskIndex, null, subtaskMetricStore, null));
+ } else {
+ int currentAttempt =
+ currentExecutionAttempts == null
Review Comment:
In which case currentExecutionAttempts can be null?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java:
##########
@@ -100,26 +106,73 @@ private JobVertexBackPressureInfo
createJobVertexBackPressureInfo(
}
private List<SubtaskBackPressureInfo> createSubtaskBackPressureInfo(
- Map<Integer, ComponentMetricStore> subtaskMetricStores) {
+ TaskMetricStore taskMetricStore, Map<Integer, Integer>
currentExecutionAttempts) {
+ Map<Integer, ComponentMetricStore> subtaskMetricStores =
+ taskMetricStore.getAllSubtaskMetricStores();
List<SubtaskBackPressureInfo> result = new
ArrayList<>(subtaskMetricStores.size());
for (Map.Entry<Integer, ComponentMetricStore> entry :
subtaskMetricStores.entrySet()) {
int subtaskIndex = entry.getKey();
- ComponentMetricStore subtaskMetricStore = entry.getValue();
- double backPressureRatio =
getBackPressureRatio(subtaskMetricStore);
- double idleRatio = getIdleRatio(subtaskMetricStore);
- double busyRatio = getBusyRatio(subtaskMetricStore);
- result.add(
- new SubtaskBackPressureInfo(
- subtaskIndex,
- getBackPressureLevel(backPressureRatio),
- backPressureRatio,
- idleRatio,
- busyRatio));
+ SubtaskMetricStore subtaskMetricStore =
+ taskMetricStore.getSubtaskMetricStore(subtaskIndex);
+ Map<Integer, ComponentMetricStore> allAttemptsMetricStores =
+ subtaskMetricStore.getAllAttemptsMetricStores();
+ if (allAttemptsMetricStores.isEmpty() ||
allAttemptsMetricStores.size() == 1) {
+ result.add(
+ createSubtaskAttemptBackpressureInfo(
+ subtaskIndex, null, subtaskMetricStore, null));
+ } else {
+ int currentAttempt =
+ currentExecutionAttempts == null
+ ? -1
+ :
currentExecutionAttempts.getOrDefault(subtaskIndex, -1);
+ if (!allAttemptsMetricStores.containsKey(currentAttempt)) {
+ // allAttemptsMetricStores is not empty here
+ currentAttempt =
allAttemptsMetricStores.keySet().iterator().next();
+ }
+ List<SubtaskBackPressureInfo> otherConcurrentAttempts =
+ new ArrayList<>(allAttemptsMetricStores.size() - 1);
+ for (Map.Entry<Integer, ComponentMetricStore> attemptStore :
+ allAttemptsMetricStores.entrySet()) {
+ if (attemptStore.getKey() == currentAttempt) {
+ continue;
+ }
+ otherConcurrentAttempts.add(
+ createSubtaskAttemptBackpressureInfo(
+ subtaskIndex,
+ attemptStore.getKey(),
+ attemptStore.getValue(),
+ null));
+ }
+ result.add(
+ createSubtaskAttemptBackpressureInfo(
+ subtaskIndex,
+ currentAttempt,
+ allAttemptsMetricStores.get(currentAttempt),
+ otherConcurrentAttempts));
+ }
}
result.sort(Comparator.comparingInt(SubtaskBackPressureInfo::getSubtask));
return result;
}
+ private SubtaskBackPressureInfo createSubtaskAttemptBackpressureInfo(
+ int subtaskIndex,
+ Integer attemptNumber,
Review Comment:
Better to annotate it and otherConcurrentAttempts as Nullable.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java:
##########
@@ -100,6 +106,36 @@ public class SubtaskExecutionAttemptDetailsInfo implements
ResponseBody {
@JsonProperty(FIELD_NAME_STATUS_DURATION)
private final Map<ExecutionState, Long> statusDuration;
+ @JsonProperty(FIELD_NAME_OTHER_CONCURRENT_ATTEMPTS)
+ @JsonInclude(Include.NON_EMPTY)
+ @Nullable
+ private final List<SubtaskExecutionAttemptDetailsInfo>
otherConcurrentAttempts;
+
+ public SubtaskExecutionAttemptDetailsInfo(
Review Comment:
Is this for test only? If so, I would suggest to remove it and refactor the
tests instead.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java:
##########
@@ -100,26 +106,73 @@ private JobVertexBackPressureInfo
createJobVertexBackPressureInfo(
}
private List<SubtaskBackPressureInfo> createSubtaskBackPressureInfo(
- Map<Integer, ComponentMetricStore> subtaskMetricStores) {
+ TaskMetricStore taskMetricStore, Map<Integer, Integer>
currentExecutionAttempts) {
+ Map<Integer, ComponentMetricStore> subtaskMetricStores =
+ taskMetricStore.getAllSubtaskMetricStores();
List<SubtaskBackPressureInfo> result = new
ArrayList<>(subtaskMetricStores.size());
for (Map.Entry<Integer, ComponentMetricStore> entry :
subtaskMetricStores.entrySet()) {
int subtaskIndex = entry.getKey();
- ComponentMetricStore subtaskMetricStore = entry.getValue();
- double backPressureRatio =
getBackPressureRatio(subtaskMetricStore);
- double idleRatio = getIdleRatio(subtaskMetricStore);
- double busyRatio = getBusyRatio(subtaskMetricStore);
- result.add(
- new SubtaskBackPressureInfo(
- subtaskIndex,
- getBackPressureLevel(backPressureRatio),
- backPressureRatio,
- idleRatio,
- busyRatio));
+ SubtaskMetricStore subtaskMetricStore =
+ taskMetricStore.getSubtaskMetricStore(subtaskIndex);
+ Map<Integer, ComponentMetricStore> allAttemptsMetricStores =
+ subtaskMetricStore.getAllAttemptsMetricStores();
+ if (allAttemptsMetricStores.isEmpty() ||
allAttemptsMetricStores.size() == 1) {
+ result.add(
+ createSubtaskAttemptBackpressureInfo(
+ subtaskIndex, null, subtaskMetricStore, null));
+ } else {
+ int currentAttempt =
+ currentExecutionAttempts == null
+ ? -1
+ :
currentExecutionAttempts.getOrDefault(subtaskIndex, -1);
+ if (!allAttemptsMetricStores.containsKey(currentAttempt)) {
+ // allAttemptsMetricStores is not empty here
+ currentAttempt =
allAttemptsMetricStores.keySet().iterator().next();
+ }
+ List<SubtaskBackPressureInfo> otherConcurrentAttempts =
+ new ArrayList<>(allAttemptsMetricStores.size() - 1);
+ for (Map.Entry<Integer, ComponentMetricStore> attemptStore :
+ allAttemptsMetricStores.entrySet()) {
+ if (attemptStore.getKey() == currentAttempt) {
+ continue;
+ }
+ otherConcurrentAttempts.add(
+ createSubtaskAttemptBackpressureInfo(
+ subtaskIndex,
+ attemptStore.getKey(),
+ attemptStore.getValue(),
+ null));
+ }
+ result.add(
+ createSubtaskAttemptBackpressureInfo(
+ subtaskIndex,
+ currentAttempt,
+ allAttemptsMetricStores.get(currentAttempt),
+ otherConcurrentAttempts));
+ }
}
result.sort(Comparator.comparingInt(SubtaskBackPressureInfo::getSubtask));
Review Comment:
Is this a stable sort?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -391,4 +476,36 @@ private static TaskMetricStore
unmodifiable(TaskMetricStore source) {
unmodifiableMap(source.metrics),
unmodifiableMap(source.subtasks));
}
}
+
+ /** Sub-structure containing metrics of a single subtask. */
+ @ThreadSafe
+ public static class SubtaskMetricStore extends ComponentMetricStore {
+ private final Map<Integer, ComponentMetricStore> attempts;
+
+ private SubtaskMetricStore() {
+ this(new ConcurrentHashMap<>(), new ConcurrentHashMap<>());
+ }
+
+ private SubtaskMetricStore(
+ Map<String, String> metrics, Map<Integer,
ComponentMetricStore> attempts) {
+ super(metrics);
+ this.attempts = checkNotNull(attempts);
+ }
+
+ private static SubtaskMetricStore unmodifiable(SubtaskMetricStore
source) {
+ if (source == null) {
+ return null;
+ }
+ return new SubtaskMetricStore(
+ unmodifiableMap(source.metrics),
unmodifiableMap(source.attempts));
+ }
+
+ public ComponentMetricStore getAttemptsMetricStore(int attemptId) {
+ return attempts.get(attemptId);
+ }
+
+ public Map<Integer, ComponentMetricStore> getAllAttemptsMetricStores()
{
+ return attempts;
Review Comment:
better to make it unmodifiable.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java:
##########
@@ -99,25 +99,27 @@ public Collection<ArchivedJson>
archiveJsonWithPath(AccessExecutionGraph graph)
List<ArchivedJson> archive = new ArrayList<>(16);
for (AccessExecutionJobVertex task : graph.getAllVertices().values()) {
for (AccessExecutionVertex subtask : task.getTaskVertices()) {
- ResponseBody curAttemptJson =
-
createAccumulatorInfo(subtask.getCurrentExecutionAttempt());
- String curAttemptPath =
- getMessageHeaders()
- .getTargetRestEndpointURL()
- .replace(':' + JobIDPathParameter.KEY,
graph.getJobID().toString())
- .replace(
- ':' + JobVertexIdPathParameter.KEY,
- task.getJobVertexId().toString())
- .replace(
- ':' + SubtaskIndexPathParameter.KEY,
-
String.valueOf(subtask.getParallelSubtaskIndex()))
- .replace(
- ':' + SubtaskAttemptPathParameter.KEY,
- String.valueOf(
-
subtask.getCurrentExecutionAttempt()
- .getAttemptNumber()));
-
- archive.add(new ArchivedJson(curAttemptPath, curAttemptJson));
+ for (AccessExecution attempt : subtask.getCurrentExecutions())
{
+ if (attempt != null) {
Review Comment:
In which case `attempt` can be null?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java:
##########
@@ -100,26 +106,73 @@ private JobVertexBackPressureInfo
createJobVertexBackPressureInfo(
}
private List<SubtaskBackPressureInfo> createSubtaskBackPressureInfo(
- Map<Integer, ComponentMetricStore> subtaskMetricStores) {
+ TaskMetricStore taskMetricStore, Map<Integer, Integer>
currentExecutionAttempts) {
+ Map<Integer, ComponentMetricStore> subtaskMetricStores =
+ taskMetricStore.getAllSubtaskMetricStores();
List<SubtaskBackPressureInfo> result = new
ArrayList<>(subtaskMetricStores.size());
for (Map.Entry<Integer, ComponentMetricStore> entry :
subtaskMetricStores.entrySet()) {
int subtaskIndex = entry.getKey();
- ComponentMetricStore subtaskMetricStore = entry.getValue();
- double backPressureRatio =
getBackPressureRatio(subtaskMetricStore);
- double idleRatio = getIdleRatio(subtaskMetricStore);
- double busyRatio = getBusyRatio(subtaskMetricStore);
- result.add(
- new SubtaskBackPressureInfo(
- subtaskIndex,
- getBackPressureLevel(backPressureRatio),
- backPressureRatio,
- idleRatio,
- busyRatio));
+ SubtaskMetricStore subtaskMetricStore =
+ taskMetricStore.getSubtaskMetricStore(subtaskIndex);
+ Map<Integer, ComponentMetricStore> allAttemptsMetricStores =
+ subtaskMetricStore.getAllAttemptsMetricStores();
+ if (allAttemptsMetricStores.isEmpty() ||
allAttemptsMetricStores.size() == 1) {
+ result.add(
+ createSubtaskAttemptBackpressureInfo(
+ subtaskIndex, null, subtaskMetricStore, null));
+ } else {
+ int currentAttempt =
+ currentExecutionAttempts == null
+ ? -1
+ :
currentExecutionAttempts.getOrDefault(subtaskIndex, -1);
+ if (!allAttemptsMetricStores.containsKey(currentAttempt)) {
Review Comment:
Is there any case that `currentAttempt != -1` but it is not contained in
`allAttemptsMetricStores`?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java:
##########
@@ -182,30 +188,32 @@ private static JobVertexTaskManagersInfo
createJobVertexTaskManagersInfo(
allFinished &= state.isTerminal();
endTime = Math.max(endTime, vertex.getStateTimestamp(state));
- counts.addIOMetrics(
- vertex.getCurrentExecutionAttempt(),
- metricFetcher,
- jobID.toString(),
- jobVertex.getJobVertexId().toString());
- MutableIOMetrics current = new MutableIOMetrics();
- current.addIOMetrics(
- vertex.getCurrentExecutionAttempt(),
- metricFetcher,
- jobID.toString(),
- jobVertex.getJobVertexId().toString());
- ioMetricsInfos.add(
- new IOMetricsInfo(
- current.getNumBytesIn(),
- current.isNumBytesInComplete(),
- current.getNumBytesOut(),
- current.isNumBytesOutComplete(),
- current.getNumRecordsIn(),
- current.isNumRecordsInComplete(),
- current.getNumRecordsOut(),
- current.isNumRecordsOutComplete(),
- current.getAccumulateBackPressuredTime(),
- current.getAccumulateIdleTime(),
- current.getAccumulateBusyTime()));
+ for (AccessExecution attempt : vertex.getCurrentExecutions()) {
Review Comment:
Looks to me this is not right because the attempts of an execution vertex
may locate in different task managers.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]