ztison commented on code in PR #24911:
URL: https://github.com/apache/flink/pull/24911#discussion_r1657013866


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java:
##########
@@ -214,402 +60,49 @@ void reportRestoredCheckpoint(RestoredCheckpointStats 
restored) {
                 restored.getStateSize());
     }
 
-    public void reportRestoredCheckpoint(
+    void reportRestoredCheckpoint(
             long checkpointID,
             CheckpointProperties properties,
             String externalPath,
-            long stateSize) {
-        statsReadWriteLock.lock();
-        try {
-            counts.incrementRestoredCheckpoints();
-            checkState(
-                    jobInitializationMetricsBuilder.isPresent(),
-                    "JobInitializationMetrics should have been set first, 
before RestoredCheckpointStats");
-            jobInitializationMetricsBuilder
-                    .get()
-                    .setRestoredCheckpointStats(checkpointID, stateSize, 
properties, externalPath);
-            dirty = true;
-        } finally {
-            statsReadWriteLock.unlock();
-        }
-    }
+            long stateSize);
 
     /**
      * Callback when a checkpoint completes.
      *
      * @param completed The completed checkpoint stats.
      */
-    void reportCompletedCheckpoint(CompletedCheckpointStats completed) {
-        statsReadWriteLock.lock();
-        try {
-            latestCompletedCheckpoint = completed;
+    void reportCompletedCheckpoint(CompletedCheckpointStats completed);
 
-            counts.incrementCompletedCheckpoints();
-            history.replacePendingCheckpointById(completed);
+    PendingCheckpointStats getPendingCheckpointStats(long checkpointId);
 
-            summary.updateSummary(completed);
+    void reportIncompleteStats(
+            long checkpointId, ExecutionAttemptID attemptId, CheckpointMetrics 
metrics);
 
-            dirty = true;
-            logCheckpointStatistics(completed);
-        } finally {
-            statsReadWriteLock.unlock();
-        }
-    }
+    void reportInitializationStarted(
+            Set<ExecutionAttemptID> toInitialize, long initializationStartTs);
 
-    /**
-     * Callback when a checkpoint fails.
-     *
-     * @param failed The failed checkpoint stats.
-     */
-    void reportFailedCheckpoint(FailedCheckpointStats failed) {
-        statsReadWriteLock.lock();
-        try {
-            counts.incrementFailedCheckpoints();
-            history.replacePendingCheckpointById(failed);
-
-            dirty = true;
-            logCheckpointStatistics(failed);
-        } finally {
-            statsReadWriteLock.unlock();
-        }
-    }
-
-    private void logCheckpointStatistics(AbstractCheckpointStats 
checkpointStats) {
-        try {
-            metricGroup.addSpan(
-                    Span.builder(CheckpointStatsTracker.class, "Checkpoint")
-                            
.setStartTsMillis(checkpointStats.getTriggerTimestamp())
-                            
.setEndTsMillis(checkpointStats.getLatestAckTimestamp())
-                            .setAttribute("checkpointId", 
checkpointStats.getCheckpointId())
-                            .setAttribute("fullSize", 
checkpointStats.getStateSize())
-                            .setAttribute("checkpointedSize", 
checkpointStats.getCheckpointedSize())
-                            .setAttribute("checkpointStatus", 
checkpointStats.getStatus().name()));
-            if (LOG.isDebugEnabled()) {
-                StringWriter sw = new StringWriter();
-                MAPPER.writeValue(
-                        sw,
-                        
CheckpointStatistics.generateCheckpointStatistics(checkpointStats, true));
-                String jsonDump = sw.toString();
-                LOG.debug(
-                        "CheckpointStatistics (for jobID={}, checkpointId={}) 
dump = {} ",
-                        metricGroup.jobId(),
-                        checkpointStats.checkpointId,
-                        jsonDump);
-            }
-        } catch (Exception ex) {
-            LOG.warn("Fail to log CheckpointStatistics", ex);
-        }
-    }
-
-    /**
-     * Callback when a checkpoint failure without in progress checkpoint. For 
example, it should be
-     * callback when triggering checkpoint failure before creating 
PendingCheckpoint.
-     */
-    public void reportFailedCheckpointsWithoutInProgress() {
-        statsReadWriteLock.lock();
-        try {
-            counts.incrementFailedCheckpointsWithoutInProgress();
-
-            dirty = true;
-        } finally {
-            statsReadWriteLock.unlock();
-        }
-    }
-
-    public PendingCheckpointStats getPendingCheckpointStats(long checkpointId) 
{
-        statsReadWriteLock.lock();
-        try {
-            AbstractCheckpointStats stats = 
history.getCheckpointById(checkpointId);
-            return stats instanceof PendingCheckpointStats ? 
(PendingCheckpointStats) stats : null;
-        } finally {
-            statsReadWriteLock.unlock();
-        }
-    }
-
-    public void reportIncompleteStats(
-            long checkpointId, ExecutionAttemptID attemptId, CheckpointMetrics 
metrics) {
-        statsReadWriteLock.lock();
-        try {
-            AbstractCheckpointStats stats = 
history.getCheckpointById(checkpointId);
-            if (stats instanceof PendingCheckpointStats) {
-                ((PendingCheckpointStats) stats)
-                        .reportSubtaskStats(
-                                attemptId.getJobVertexId(),
-                                new SubtaskStateStats(
-                                        attemptId.getSubtaskIndex(),
-                                        System.currentTimeMillis(),
-                                        
metrics.getBytesPersistedOfThisCheckpoint(),
-                                        metrics.getTotalBytesPersisted(),
-                                        metrics.getSyncDurationMillis(),
-                                        metrics.getAsyncDurationMillis(),
-                                        
metrics.getBytesProcessedDuringAlignment(),
-                                        
metrics.getBytesPersistedDuringAlignment(),
-                                        metrics.getAlignmentDurationNanos() / 
1_000_000,
-                                        metrics.getCheckpointStartDelayNanos() 
/ 1_000_000,
-                                        metrics.getUnalignedCheckpoint(),
-                                        false));
-                dirty = true;
-            }
-        } finally {
-            statsReadWriteLock.unlock();
-        }
-    }
-
-    public void reportInitializationStarted(
-            Set<ExecutionAttemptID> toInitialize, long initializationStartTs) {
-        jobInitializationMetricsBuilder =
-                Optional.of(
-                        new JobInitializationMetricsBuilder(toInitialize, 
initializationStartTs));
-    }
-
-    public void reportInitializationMetrics(
+    void reportInitializationMetrics(
             ExecutionAttemptID executionAttemptId,
-            SubTaskInitializationMetrics initializationMetrics) {
-        statsReadWriteLock.lock();
-        try {
-            if (!jobInitializationMetricsBuilder.isPresent()) {
-                LOG.warn(
-                        "Attempted to report SubTaskInitializationMetrics [{}] 
without jobInitializationMetricsBuilder present",
-                        initializationMetrics);
-                return;
-            }
-            JobInitializationMetricsBuilder builder = 
jobInitializationMetricsBuilder.get();
-            builder.reportInitializationMetrics(executionAttemptId, 
initializationMetrics);
-            if (builder.isComplete()) {
-                traceInitializationMetrics(builder.build());
-            }
-        } catch (Exception ex) {
-            LOG.warn("Failed to log SubTaskInitializationMetrics [{}]", 
initializationMetrics, ex);
-        } finally {
-            statsReadWriteLock.unlock();
-        }
-    }
-
-    private void traceInitializationMetrics(JobInitializationMetrics 
jobInitializationMetrics) {
-        SpanBuilder span =
-                Span.builder(CheckpointStatsTracker.class, "JobInitialization")
-                        
.setStartTsMillis(jobInitializationMetrics.getStartTs())
-                        .setEndTsMillis(jobInitializationMetrics.getEndTs())
-                        .setAttribute(
-                                "initializationStatus",
-                                jobInitializationMetrics.getStatus().name());
-        for (JobInitializationMetrics.SumMaxDuration duration :
-                jobInitializationMetrics.getDurationMetrics().values()) {
-            setDurationSpanAttribute(span, duration);
-        }
-        if (jobInitializationMetrics.getCheckpointId() != 
JobInitializationMetrics.UNSET) {
-            span.setAttribute("checkpointId", 
jobInitializationMetrics.getCheckpointId());
-        }
-        if (jobInitializationMetrics.getStateSize() != 
JobInitializationMetrics.UNSET) {
-            span.setAttribute("fullSize", 
jobInitializationMetrics.getStateSize());
-        }
-        metricGroup.addSpan(span);
-    }
-
-    private void setDurationSpanAttribute(
-            SpanBuilder span, JobInitializationMetrics.SumMaxDuration 
duration) {
-        span.setAttribute("max" + duration.getName(), duration.getMax());
-        span.setAttribute("sum" + duration.getName(), duration.getSum());
-    }
-
-    // ------------------------------------------------------------------------
-    // Metrics
-    // ------------------------------------------------------------------------
-
-    @VisibleForTesting
-    static final String NUMBER_OF_CHECKPOINTS_METRIC = 
"totalNumberOfCheckpoints";
-
-    @VisibleForTesting
-    static final String NUMBER_OF_IN_PROGRESS_CHECKPOINTS_METRIC = 
"numberOfInProgressCheckpoints";
-
-    @VisibleForTesting
-    static final String NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC = 
"numberOfCompletedCheckpoints";
-
-    @VisibleForTesting
-    static final String NUMBER_OF_FAILED_CHECKPOINTS_METRIC = 
"numberOfFailedCheckpoints";
-
-    @VisibleForTesting
-    static final String LATEST_RESTORED_CHECKPOINT_TIMESTAMP_METRIC =
-            "lastCheckpointRestoreTimestamp";
-
-    @VisibleForTesting
-    static final String LATEST_COMPLETED_CHECKPOINT_SIZE_METRIC = 
"lastCheckpointSize";
-
-    @VisibleForTesting
-    static final String LATEST_COMPLETED_CHECKPOINT_FULL_SIZE_METRIC = 
"lastCheckpointFullSize";
-
-    @VisibleForTesting
-    static final String LATEST_COMPLETED_CHECKPOINT_DURATION_METRIC = 
"lastCheckpointDuration";
-
-    @VisibleForTesting
-    static final String LATEST_COMPLETED_CHECKPOINT_PROCESSED_DATA_METRIC =
-            "lastCheckpointProcessedData";
-
-    @VisibleForTesting
-    static final String LATEST_COMPLETED_CHECKPOINT_PERSISTED_DATA_METRIC =
-            "lastCheckpointPersistedData";
-
-    @VisibleForTesting
-    static final String LATEST_COMPLETED_CHECKPOINT_EXTERNAL_PATH_METRIC =
-            "lastCheckpointExternalPath";
-
-    @VisibleForTesting
-    static final String LATEST_COMPLETED_CHECKPOINT_ID_METRIC = 
"lastCompletedCheckpointId";
+            SubTaskInitializationMetrics initializationMetrics);
 
     /**
-     * Register the exposed metrics.
+     * Creates a new pending checkpoint tracker.
      *
-     * @param metricGroup Metric group to use for the metrics.
+     * @param checkpointId ID of the checkpoint.
+     * @param triggerTimestamp Trigger timestamp of the checkpoint.
+     * @param props The checkpoint properties.
+     * @param vertexToDop mapping of {@link JobVertexID} to DOP
+     * @return Tracker for statistics gathering.
      */
-    private void registerMetrics(MetricGroup metricGroup) {
-        metricGroup.gauge(NUMBER_OF_CHECKPOINTS_METRIC, new 
CheckpointsCounter());
-        metricGroup.gauge(
-                NUMBER_OF_IN_PROGRESS_CHECKPOINTS_METRIC, new 
InProgressCheckpointsCounter());
-        metricGroup.gauge(
-                NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC, new 
CompletedCheckpointsCounter());
-        metricGroup.gauge(NUMBER_OF_FAILED_CHECKPOINTS_METRIC, new 
FailedCheckpointsCounter());
-        metricGroup.gauge(
-                LATEST_RESTORED_CHECKPOINT_TIMESTAMP_METRIC,
-                new LatestRestoredCheckpointTimestampGauge());
-        metricGroup.gauge(
-                LATEST_COMPLETED_CHECKPOINT_SIZE_METRIC, new 
LatestCompletedCheckpointSizeGauge());
-        metricGroup.gauge(
-                LATEST_COMPLETED_CHECKPOINT_FULL_SIZE_METRIC,
-                new LatestCompletedCheckpointFullSizeGauge());
-        metricGroup.gauge(
-                LATEST_COMPLETED_CHECKPOINT_DURATION_METRIC,
-                new LatestCompletedCheckpointDurationGauge());
-        metricGroup.gauge(
-                LATEST_COMPLETED_CHECKPOINT_PROCESSED_DATA_METRIC,
-                new LatestCompletedCheckpointProcessedDataGauge());
-        metricGroup.gauge(
-                LATEST_COMPLETED_CHECKPOINT_PERSISTED_DATA_METRIC,
-                new LatestCompletedCheckpointPersistedDataGauge());
-        metricGroup.gauge(
-                LATEST_COMPLETED_CHECKPOINT_EXTERNAL_PATH_METRIC,
-                new LatestCompletedCheckpointExternalPathGauge());
-        metricGroup.gauge(
-                LATEST_COMPLETED_CHECKPOINT_ID_METRIC, new 
LatestCompletedCheckpointIdGauge());
-    }
-
-    private class CheckpointsCounter implements Gauge<Long> {
-        @Override
-        public Long getValue() {
-            return counts.getTotalNumberOfCheckpoints();
-        }
-    }
-
-    private class InProgressCheckpointsCounter implements Gauge<Integer> {
-        @Override
-        public Integer getValue() {
-            return counts.getNumberOfInProgressCheckpoints();
-        }
-    }
-
-    private class CompletedCheckpointsCounter implements Gauge<Long> {
-        @Override
-        public Long getValue() {
-            return counts.getNumberOfCompletedCheckpoints();
-        }
-    }
-
-    private class FailedCheckpointsCounter implements Gauge<Long> {
-        @Override
-        public Long getValue() {
-            return counts.getNumberOfFailedCheckpoints();
-        }
-    }
-
-    private class LatestRestoredCheckpointTimestampGauge implements 
Gauge<Long> {
-        @Override
-        public Long getValue() {
-            return jobInitializationMetricsBuilder
-                    .map(JobInitializationMetricsBuilder::getStartTs)
-                    .orElse(-1L);
-        }
-    }
-
-    private class LatestCompletedCheckpointSizeGauge implements Gauge<Long> {
-        @Override
-        public Long getValue() {
-            CompletedCheckpointStats completed = latestCompletedCheckpoint;
-            if (completed != null) {
-                return completed.getCheckpointedSize();
-            } else {
-                return -1L;
-            }
-        }
-    }
-
-    private class LatestCompletedCheckpointFullSizeGauge implements 
Gauge<Long> {
-        @Override
-        public Long getValue() {
-            CompletedCheckpointStats completed = latestCompletedCheckpoint;
-            if (completed != null) {
-                return completed.getStateSize();
-            } else {
-                return -1L;
-            }
-        }
-    }
-
-    private class LatestCompletedCheckpointDurationGauge implements 
Gauge<Long> {
-        @Override
-        public Long getValue() {
-            CompletedCheckpointStats completed = latestCompletedCheckpoint;
-            if (completed != null) {
-                return completed.getEndToEndDuration();
-            } else {
-                return -1L;
-            }
-        }
-    }
-
-    private class LatestCompletedCheckpointProcessedDataGauge implements 
Gauge<Long> {
-        @Override
-        public Long getValue() {
-            CompletedCheckpointStats completed = latestCompletedCheckpoint;
-            if (completed != null) {
-                return completed.getProcessedData();
-            } else {
-                return -1L;
-            }
-        }
-    }
+    PendingCheckpointStats reportPendingCheckpoint(
+            long checkpointId,
+            long triggerTimestamp,
+            CheckpointProperties props,
+            Map<JobVertexID, Integer> vertexToDop);
 
-    private class LatestCompletedCheckpointPersistedDataGauge implements 
Gauge<Long> {
-        @Override
-        public Long getValue() {
-            CompletedCheckpointStats completed = latestCompletedCheckpoint;
-            if (completed != null) {
-                return completed.getPersistedData();
-            } else {
-                return -1L;
-            }
-        }
-    }
+    void reportFailedCheckpoint(FailedCheckpointStats failed);
 
-    private class LatestCompletedCheckpointExternalPathGauge implements 
Gauge<String> {
-        @Override
-        public String getValue() {
-            CompletedCheckpointStats completed = latestCompletedCheckpoint;
-            if (completed != null && completed.getExternalPath() != null) {
-                return completed.getExternalPath();
-            } else {
-                return "n/a";
-            }
-        }
-    }
+    void reportFailedCheckpointsWithoutInProgress();
 
-    private class LatestCompletedCheckpointIdGauge implements Gauge<Long> {
-        @Override
-        public Long getValue() {
-            CompletedCheckpointStats completed = latestCompletedCheckpoint;
-            if (completed != null) {
-                return completed.getCheckpointId();
-            } else {
-                return -1L;
-            }
-        }
-    }
+    CheckpointStatsSnapshot createSnapshot();

Review Comment:
   You have omitted a Javadoc for some methods, e.g., `CheckpointStatsSnapshot 
createSnapshot.` Maybe give them back?
   
   ```suggestion
   
       /**
        * Creates a new snapshot of the available stats.
        *
        * @return The latest statistics snapshot.
        */
        CheckpointStatsSnapshot createSnapshot();
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/JobInitializationMetricsBuilder.java:
##########
@@ -19,40 +19,52 @@
 package org.apache.flink.runtime.checkpoint;
 
 import 
org.apache.flink.runtime.checkpoint.JobInitializationMetrics.SumMaxDuration;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 
 import static 
org.apache.flink.runtime.checkpoint.JobInitializationMetrics.UNSET;
-import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkState;
 
 class JobInitializationMetricsBuilder {
     private static final Logger LOG =
             LoggerFactory.getLogger(JobInitializationMetricsBuilder.class);
 
-    private final List<SubTaskInitializationMetrics> reportedMetrics = new 
ArrayList<>();
-    private final int totalNumberOfSubTasks;
+    private final Map<ExecutionAttemptID, SubTaskInitializationMetrics> 
reportedMetrics =
+            new HashMap<>();
+    private final Set<ExecutionAttemptID> toInitialize;
     private final long startTs;
     private Optional<Long> stateSize = Optional.empty();
     private Optional<Long> checkpointId = Optional.empty();
     private Optional<CheckpointProperties> checkpointProperties = 
Optional.empty();
     private Optional<String> externalPath = Optional.empty();
 
-    JobInitializationMetricsBuilder(int totalNumberOfSubTasks, long startTs) {
-        checkArgument(totalNumberOfSubTasks > 0);
-        this.totalNumberOfSubTasks = totalNumberOfSubTasks;
+    /**

Review Comment:
   Thanks for the verification. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to