rkhachatryan commented on a change in pull request #14635:
URL: https://github.com/apache/flink/pull/14635#discussion_r557474839
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -1051,6 +1051,34 @@ public void acknowledgeCheckpoint(
}
}
+ @Override
+ public void reportCheckpointMetrics(
+ JobID jobID, ExecutionAttemptID attemptId, long id,
CheckpointMetrics metrics) {
+ mainThreadExecutor.assertRunningInMainThread();
+
+ final CheckpointCoordinator checkpointCoordinator =
+ executionGraph.getCheckpointCoordinator();
+
+ if (checkpointCoordinator != null) {
+ ioExecutor.execute(
+ () -> {
+ try {
+ checkpointCoordinator.reportStats(id, attemptId,
metrics);
+ } catch (Throwable t) {
+ log.warn("Error while processing report checkpoint
stats message", t);
+ }
+ });
+ } else {
+ String errorMessage =
+ "Received ReportCheckpointStats message for job {} with no
CheckpointCoordinator";
+ if (executionGraph.getState() == JobStatus.RUNNING) {
+ log.error(errorMessage, jobGraph.getJobID());
+ } else {
+ log.debug(errorMessage, jobGraph.getJobID());
+ }
+ }
+ }
Review comment:
Good idea!
(I'll extract a method in a separate commit as there are at least 2
existing methods already)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]