pnowojski commented on code in PR #23908:
URL: https://github.com/apache/flink/pull/23908#discussion_r1441710885
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java:
##########
@@ -343,6 +376,62 @@ public void reportIncompleteStats(
}
}
+ public void reportInitializationStartTs(long initializationStartTs) {
+ jobInitializationMetricsBuilder =
+ Optional.of(
+ new JobInitializationMetricsBuilder(
+ totalNumberOfSubTasks, initializationStartTs));
+ }
+
+ public void reportInitializationMetrics(SubTaskInitializationMetrics
initializationMetrics) {
+ statsReadWriteLock.lock();
+ try {
+ if (!jobInitializationMetricsBuilder.isPresent()) {
+ LOG.warn(
+ "Attempted to report SubTaskInitializationMetrics [{}]
without jobInitializationMetricsBuilder present",
+ initializationMetrics);
+ return;
+ }
+ jobInitializationMetricsBuilder
+ .get()
+ .reportInitializationMetrics(initializationMetrics);
+ if (jobInitializationMetricsBuilder.get().isComplete()) {
+
traceInitializationMetrics(jobInitializationMetricsBuilder.get().build());
+ }
+ } catch (Exception ex) {
+ LOG.warn("Fail to log SubTaskInitializationMetrics[{}]", ex,
initializationMetrics);
+ } finally {
+ statsReadWriteLock.unlock();
+ }
+ }
+
+ private void traceInitializationMetrics(JobInitializationMetrics
jobInitializationMetrics) {
+ SpanBuilder span =
+ Span.builder(CheckpointStatsTracker.class, "JobInitialization")
+
.setStartTsMillis(jobInitializationMetrics.getStartTs())
+ .setEndTsMillis(jobInitializationMetrics.getEndTs())
+ .setAttribute(
+ "initializationStatus",
+ jobInitializationMetrics.getStatus().name());
+ for (JobInitializationMetrics.SumMaxDuration duration :
+ jobInitializationMetrics.getDurationMetrics().values()) {
+ setDurationSpanAttribute(span, duration);
+ }
+ if (jobInitializationMetrics.getCheckpointId() !=
JobInitializationMetrics.UNSET) {
+ span.setAttribute("checkpointId",
jobInitializationMetrics.getCheckpointId());
+ }
+ if (jobInitializationMetrics.getStateSize() !=
JobInitializationMetrics.UNSET) {
+ span.setAttribute("fullSize",
jobInitializationMetrics.getStateSize());
+ }
+ metricGroup.addSpan(span);
+ }
+
+ private void setDurationSpanAttribute(
+ SpanBuilder span, JobInitializationMetrics.SumMaxDuration
duration) {
+ span.setAttribute("max" + duration.getName(), duration.getMax());
+ span.setAttribute("sum" + duration.getName(), duration.getSum());
Review Comment:
🤷 those seemed to be the most important to me. `avg` can be derived from the
`sum`. `max` is better when looking for bottlenecks. `sum` is better when
looking at optimising the amount of used resources during the process as a
whole.
`sum` can also be helpful on it's own, like for example "sum of local state
size vs sum of remote state size" during recovery.
Reporting unaggregated values would require child spans from subtasks to be
reported. This was discussed as the future work.
I also didn't want to spam the metrics too much. Always if needed we can add
`avg` or sth else later.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]