hlteoh37 commented on code in PR #22901:
URL: https://github.com/apache/flink/pull/22901#discussion_r1252040131


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java:
##########
@@ -96,6 +97,15 @@ public interface JobManagerRunner extends AutoCloseableAsync 
{
      */
     CompletableFuture<ExecutionGraphInfo> requestJob(Time timeout);
 
+    /**
+     * Requests the {@link CheckpointStatsSnapshot} of the executed job.
+     *
+     * @param timeout for the rpc call
+     * @return Future which is completed with the {@link 
CheckpointStatsSnapshot} of the executed
+     *     job. Returns null if job is not running.
+     */
+    CompletableFuture<CheckpointStatsSnapshot> requestCheckpointStats(Time 
timeout);

Review Comment:
   This particular change is required if we want to just request the 
`CheckpointStatsSnapshot` object from the JobMaster to side step the recursive 
copy of the ArchivedExecutionGraph on the JM side.
   
   Currently, the process is:
   Client -> REST (cache ExecutionGraph) -> JobManager -> JobMaster (builds 
ArchivedExecutionGraph - recursive copy, costly(?))
   
   Without this change, the process will be
   Client -> REST (cache CheckpointStatsSnapshot) -> JobManager (request 
ExecutionGraphInfo) -> JobMaster (builds ArchivedExecutionGraph - recursive 
copy, costly (?))
   
   With this change, the process will be
   Client -> REST (cache CheckpointStatsSnapshot) -> JobManager (request 
CheckpointStatsSnapshot) -> JobMaster (retrieves CheckpointStatsSnapshot from 
cache in CheckpointStatsTracker)
   
   
   I think this change can be helpful to refactor the `JobManager` / 
`JobMaster` API to retrieve specific information about a job, rather than 1 
large data dump (ExecutionGraph)! Let me know what you think
   
   
   
   
   
   
   
   
   
   
   
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java:
##########
@@ -93,6 +94,8 @@ ExecutionState requestPartitionState(
 
     ExecutionGraphInfo requestJob();
 
+    CheckpointStatsSnapshot requestCheckpointStats();

Review Comment:
   Same as above, Yes JM can access this via ExecutionGraphInfo, but that means 
we would incur the cost of recursive copy of the ExecutionGraph when all we 
need is the CheckpointStatsSnapshot object.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointStatsHandler.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.checkpoints;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.rest.NotFoundException;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.apache.flink.shaded.guava31.com.google.common.cache.Cache;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+
+/**
+ * Abstract class for checkpoint handlers that will cache the {@link 
CheckpointStatsSnapshot}
+ * object.
+ *
+ * @param <R> the response type
+ * @param <M> the message parameters
+ */
+public abstract class AbstractCheckpointStatsHandler<

Review Comment:
   Good callout. 
   
   This seems to be missing for all REST API classes. I've used @Internal for 
this class, and created a followup JIRA 
https://issues.apache.org/jira/browse/FLINK-32537



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java:
##########
@@ -21,17 +21,22 @@
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.util.Preconditions;
 
 import java.io.File;
+import java.time.Duration;
 
 /** Configuration object containing values for the rest handler configuration. 
*/
 public class RestHandlerConfiguration {
 
     private final long refreshInterval;
 
     private final int maxCheckpointStatisticCacheEntries;
+    private final Duration checkpointStatsSnapshotCacheExpireAfterWrite;
+
+    private final int checkpointStatsSnapshotCacheSize;

Review Comment:
   Ok fixed



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java:
##########
@@ -46,6 +51,8 @@ public class RestHandlerConfiguration {
     public RestHandlerConfiguration(
             long refreshInterval,
             int maxCheckpointStatisticCacheEntries,
+            Duration checkpointStatsSnapshotCacheExpireAfterWrite,
+            int checkpointStatsSnapshotCacheSize,

Review Comment:
   Ok renamed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to