gaoyunhaii commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r931909030
##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -391,4 +476,36 @@ private static TaskMetricStore
unmodifiable(TaskMetricStore source) {
unmodifiableMap(source.metrics),
unmodifiableMap(source.subtasks));
}
}
+
+ /** Sub-structure containing metrics of a single subtask. */
+ @ThreadSafe
+ public static class SubtaskMetricStore extends ComponentMetricStore {
+ private final Map<Integer, ComponentMetricStore> attempts;
+
+ private SubtaskMetricStore() {
+ this(new ConcurrentHashMap<>(), new ConcurrentHashMap<>());
+ }
+
+ private SubtaskMetricStore(
+ Map<String, String> metrics, Map<Integer,
ComponentMetricStore> attempts) {
+ super(metrics);
+ this.attempts = checkNotNull(attempts);
+ }
+
+ private static SubtaskMetricStore unmodifiable(SubtaskMetricStore
source) {
Review Comment:
This method is not used?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -214,15 +260,33 @@ public void add(MetricDump metric) {
task = job.tasks.computeIfAbsent(taskInfo.vertexID, k ->
new TaskMetricStore());
subtask =
task.subtasks.computeIfAbsent(
- taskInfo.subtaskIndex, k -> new
ComponentMetricStore());
+ taskInfo.subtaskIndex, k -> new
SubtaskMetricStore());
+
+ if (taskInfo.attemptNumber >= 0) {
+ // Consider as the current attempt if current attempt
id is not set, which
+ // means there should be only one execution
+ isCurrentAttempt =
+ Optional.of(currentExecutionAttempts)
+ .map(m ->
m.get(taskInfo.jobID))
+ .map(m ->
m.get(taskInfo.vertexID))
+ .map(m ->
m.get(taskInfo.subtaskIndex))
+ .orElse(taskInfo.attemptNumber)
+ == taskInfo.attemptNumber;
+ attempt =
+ subtask.attempts.computeIfAbsent(
+ taskInfo.attemptNumber, k -> new
ComponentMetricStore());
+ addMetric(attempt.metrics, name, metric);
+ }
/**
* The duplication is intended. Metrics scoped by subtask
are useful for several
* job/task handlers, while the WebInterface task metric
queries currently do
* not account for subtasks, so we don't divide by subtask
and instead use the
* concatenation of subtask index and metric name as the
name for those.
*/
- addMetric(subtask.metrics, name, metric);
- addMetric(task.metrics, taskInfo.subtaskIndex + "." +
name, metric);
+ if (isCurrentAttempt) {
Review Comment:
Might move the above comments inside `if` ?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java:
##########
@@ -100,26 +106,73 @@ private JobVertexBackPressureInfo
createJobVertexBackPressureInfo(
}
private List<SubtaskBackPressureInfo> createSubtaskBackPressureInfo(
- Map<Integer, ComponentMetricStore> subtaskMetricStores) {
+ TaskMetricStore taskMetricStore, Map<Integer, Integer>
currentExecutionAttempts) {
+ Map<Integer, ComponentMetricStore> subtaskMetricStores =
+ taskMetricStore.getAllSubtaskMetricStores();
List<SubtaskBackPressureInfo> result = new
ArrayList<>(subtaskMetricStores.size());
for (Map.Entry<Integer, ComponentMetricStore> entry :
subtaskMetricStores.entrySet()) {
int subtaskIndex = entry.getKey();
- ComponentMetricStore subtaskMetricStore = entry.getValue();
- double backPressureRatio =
getBackPressureRatio(subtaskMetricStore);
- double idleRatio = getIdleRatio(subtaskMetricStore);
- double busyRatio = getBusyRatio(subtaskMetricStore);
- result.add(
- new SubtaskBackPressureInfo(
- subtaskIndex,
- getBackPressureLevel(backPressureRatio),
- backPressureRatio,
- idleRatio,
- busyRatio));
+ SubtaskMetricStore subtaskMetricStore =
+ taskMetricStore.getSubtaskMetricStore(subtaskIndex);
Review Comment:
We could make subtaskMetricStores to be `Map<Integer, SubtaskMetricStore>`
and use `entry.getValue()` directly.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -233,21 +297,42 @@ public void add(MetricDump metric) {
operatorInfo.vertexID, k -> new
TaskMetricStore());
subtask =
task.subtasks.computeIfAbsent(
- operatorInfo.subtaskIndex, k -> new
ComponentMetricStore());
+ operatorInfo.subtaskIndex, k -> new
SubtaskMetricStore());
+
+ if (operatorInfo.attemptNumber >= 0) {
Review Comment:
Could we extract a help method for the subtask and operator scopes ?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java:
##########
@@ -127,22 +128,24 @@ private static JobExceptionsInfoWithHistory
createJobExceptionsInfo(
List<JobExceptionsInfo.ExecutionExceptionInfo> taskExceptionList = new
ArrayList<>();
boolean truncated = false;
for (AccessExecutionVertex task :
executionGraph.getAllExecutionVertices()) {
- Optional<ErrorInfo> failure = task.getFailureInfo();
- if (failure.isPresent()) {
- if (taskExceptionList.size() >= exceptionToReportMaxSize) {
- truncated = true;
- break;
+ for (AccessExecution execution : task.getCurrentExecutions()) {
+ Optional<ErrorInfo> failure = execution.getFailureInfo();
+ if (failure.isPresent()) {
+ if (taskExceptionList.size() >= exceptionToReportMaxSize) {
+ truncated = true;
+ break;
+ }
+
+ TaskManagerLocation location =
execution.getAssignedResourceLocation();
+ String locationString = toString(location);
+ long timestamp =
execution.getStateTimestamp(ExecutionState.FAILED);
+ taskExceptionList.add(
+ new JobExceptionsInfo.ExecutionExceptionInfo(
+ failure.get().getExceptionAsString(),
+ task.getTaskNameWithSubtaskIndex(),
Review Comment:
It seems currently the page would not show the attempts information? Should
we modify here to distinguish the errors of different attempts ?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java:
##########
@@ -120,9 +120,24 @@ private static JobVertexDetailsInfo
createJobVertexDetailsInfo(
for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
final AccessExecution execution =
vertex.getCurrentExecutionAttempt();
final JobVertexID jobVertexID = jobVertex.getJobVertexId();
+
+ final Collection<AccessExecution> attempts =
vertex.getCurrentExecutions();
+ List<SubtaskExecutionAttemptDetailsInfo> otherConcurrentAttempts =
null;
+
+ if (attempts.size() > 1) {
+ otherConcurrentAttempts = new ArrayList<>();
+ for (AccessExecution attempt : attempts) {
+ if (attempt.getAttemptNumber() !=
execution.getAttemptNumber()) {
+ otherConcurrentAttempts.add(
+ SubtaskExecutionAttemptDetailsInfo.create(
+ attempt, metricFetcher, jobID,
jobVertexID, null));
+ }
+ }
+ }
+
subtasks.add(
SubtaskExecutionAttemptDetailsInfo.create(
- execution, metricFetcher, jobID, jobVertexID));
+ execution, metricFetcher, jobID, jobVertexID,
otherConcurrentAttempts));
Review Comment:
Might rename `execution` to `currentAttempt`
##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -214,15 +260,33 @@ public void add(MetricDump metric) {
task = job.tasks.computeIfAbsent(taskInfo.vertexID, k ->
new TaskMetricStore());
subtask =
task.subtasks.computeIfAbsent(
- taskInfo.subtaskIndex, k -> new
ComponentMetricStore());
+ taskInfo.subtaskIndex, k -> new
SubtaskMetricStore());
+
+ if (taskInfo.attemptNumber >= 0) {
+ // Consider as the current attempt if current attempt
id is not set, which
+ // means there should be only one execution
+ isCurrentAttempt =
+ Optional.of(currentExecutionAttempts)
+ .map(m ->
m.get(taskInfo.jobID))
+ .map(m ->
m.get(taskInfo.vertexID))
+ .map(m ->
m.get(taskInfo.subtaskIndex))
+ .orElse(taskInfo.attemptNumber)
+ == taskInfo.attemptNumber;
+ attempt =
+ subtask.attempts.computeIfAbsent(
Review Comment:
It seems a bit weird to break the encapsulation here to direct modify the
attempts. But we might consider it separately.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]