dmvk commented on code in PR #22901:
URL: https://github.com/apache/flink/pull/22901#discussion_r1251615108


##########
flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java:
##########
@@ -193,6 +193,28 @@ public class RestOptions {
                                     + "Lowering the thread priority will give 
Flink's main components more CPU time whereas "
                                     + "increasing will allocate more time for 
the REST server's processing.");
 
+    /**
+     * Duration from write after which cached checkpoints statistics are 
cleaned up. It can be
+     * specified using notation: "500 ms", "1 s".
+     */
+    @Documentation.Section(Documentation.Sections.EXPERT_REST)
+    public static final ConfigOption<Duration> 
CACHE_CHECKPOINT_STATISTICS_TIMEOUT =
+            key("rest.cache.checkpoint-statistics.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(3))
+                    .withDescription(
+                            "Duration from write after which cached 
checkpoints statistics are cleaned up. It can"
+                                    + " be specified using notation: \"500 
ms\", \"1 s\".");

Review Comment:
   We shouldn't be explaining how to specify duration here. If something needs 
to be clarified, we should develop a generalized explainer because it applies 
to many fields.
   ```suggestion
                       .withDescription(
                               "Duration from write, after which cached 
checkpoints statistics are cleaned up.");
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java:
##########
@@ -96,6 +97,15 @@ public interface JobManagerRunner extends AutoCloseableAsync 
{
      */
     CompletableFuture<ExecutionGraphInfo> requestJob(Time timeout);
 
+    /**
+     * Requests the {@link CheckpointStatsSnapshot} of the executed job.
+     *
+     * @param timeout for the rpc call
+     * @return Future which is completed with the {@link 
CheckpointStatsSnapshot} of the executed
+     *     job. Returns null if job is not running.
+     */
+    CompletableFuture<CheckpointStatsSnapshot> requestCheckpointStats(Time 
timeout);

Review Comment:
   Why is this needed?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java:
##########
@@ -93,6 +94,8 @@ ExecutionState requestPartitionState(
 
     ExecutionGraphInfo requestJob();
 
+    CheckpointStatsSnapshot requestCheckpointStats();

Review Comment:
   Why do we need this? Can JM access this through the ExecutionGraphInfo?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -937,6 +938,15 @@ public CompletableFuture<ExecutionGraphInfo> 
requestExecutionGraphInfo(
                 .exceptionally(checkExecutionGraphStoreOnException);
     }
 
+    @Override
+    public CompletableFuture<CheckpointStatsSnapshot> requestCheckpointStats(
+            JobID jobId, Time timeout) {
+        Optional<JobManagerRunner> maybeJob = getJobManagerRunner(jobId);
+
+        return maybeJob.map(job -> job.requestCheckpointStats(timeout))
+                .orElse(FutureUtils.completedExceptionally(new 
FlinkJobNotFoundException(jobId)));

Review Comment:
   
`org.apache.flink.runtime.dispatcher.Dispatcher#performOperationOnJobMasterGateway`
 should be used here instead



##########
flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java:
##########
@@ -193,6 +193,28 @@ public class RestOptions {
                                     + "Lowering the thread priority will give 
Flink's main components more CPU time whereas "
                                     + "increasing will allocate more time for 
the REST server's processing.");
 
+    /**
+     * Duration from write after which cached checkpoints statistics are 
cleaned up. It can be
+     * specified using notation: "500 ms", "1 s".

Review Comment:
   ```suggestion
        * Duration from write, after which cached checkpoints statistics are 
cleaned up.
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java:
##########
@@ -21,17 +21,22 @@
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.util.Preconditions;
 
 import java.io.File;
+import java.time.Duration;
 
 /** Configuration object containing values for the rest handler configuration. 
*/
 public class RestHandlerConfiguration {
 
     private final long refreshInterval;
 
     private final int maxCheckpointStatisticCacheEntries;
+    private final Duration checkpointStatsSnapshotCacheExpireAfterWrite;
+
+    private final int checkpointStatsSnapshotCacheSize;

Review Comment:
   nit: the spacings here are rather inconsistent



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to