mxm commented on code in PR #941:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/941#discussion_r1943120952


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##########
@@ -757,16 +778,35 @@ public Map<String, String> getClusterInfo(Configuration 
conf) throws Exception {
                     DashboardConfiguration.FIELD_NAME_FLINK_REVISION,
                     dashboardConfiguration.getFlinkRevision());
         }
+    }
 
-        var taskManagerReplicas = 
getTaskManagersInfo(conf).getTaskManagerInfos().size();
-        clusterInfo.put(
-                FIELD_NAME_TOTAL_CPU,
-                String.valueOf(FlinkUtils.calculateClusterCpuUsage(conf, 
taskManagerReplicas)));
-        clusterInfo.put(
-                FIELD_NAME_TOTAL_MEMORY,
-                String.valueOf(FlinkUtils.calculateClusterMemoryUsage(conf, 
taskManagerReplicas)));
+    private void populateStateSize(
+            Configuration conf, @Nullable String jobId, Map<String, String> 
clusterInfo)
+            throws Exception {
+        if (jobId != null) {
+            try (RestClusterClient<String> clusterClient = 
getClusterClient(conf)) {
+                var checkpointingStatisticsHeaders = 
CheckpointingStatisticsHeaders.getInstance();
+                var parameters = 
checkpointingStatisticsHeaders.getUnresolvedMessageParameters();
+                
parameters.jobPathParameter.resolve(JobID.fromHexString(jobId));
 
-        return clusterInfo;
+                CheckpointingStatistics checkpointingStatistics =
+                        clusterClient
+                                .sendRequest(
+                                        checkpointingStatisticsHeaders,
+                                        parameters,
+                                        EmptyRequestBody.getInstance())
+                                .get();
+                CheckpointStatistics.CompletedCheckpointStatistics 
completedCheckpointStatistics =
+                        checkpointingStatistics
+                                .getLatestCheckpoints()
+                                .getCompletedCheckpointStatistics();
+                if (completedCheckpointStatistics != null) {
+                    clusterInfo.put(
+                            FIELD_NAME_STATE_SIZE,
+                            
String.valueOf(completedCheckpointStatistics.getCheckpointedSize()));

Review Comment:
   Good question. It is a bit tricky to find this out, even after studying the 
code. Good news is that there should be a metric that is somewhat approximate 
to the actual full state / checkpoint size. It was introduced via 
https://issues.apache.org/jira/browse/FLINK-25557.
   
   Turns out, I was using the wrong metric because of a misleading JavaDoc in 
CheckpointStatistics. I've updated the PR. In the Rest API, there is 
`stateSize` and `checkpointedSize`. The former is the full state size, while 
the latter is the checkpointed size which various depending on whether 
checkpointing is incremental or not. 
   
   StateSize is based on 
https://github.com/apache/flink/blob/d0d44d6196c02552179ec23c799d9769e128a8ae/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java#L66.
 I'm not 100% sure whether it always yields the exact state size but it should 
be close.
   
   CheckpointedSize is based on 
https://github.com/apache/flink/blob/d0d44d6196c02552179ec23c799d9769e128a8ae/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeStateHandle.java#L62.
   
   I wonder whether we should expose both?
   
   



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java:
##########
@@ -403,6 +403,28 @@ public static Long 
calculateClusterMemoryUsage(Configuration conf, int taskManag
         return tmTotalMemory + jmTotalMemory;
     }
 
+    public static Long calculateClusterStateSize(Configuration conf, int 
taskManagerReplicas) {

Review Comment:
   State size or checkpoint size isn't directly related to the cluster memory 
size. For the heap memory backend, we would expect the state size to be lower 
than the overall memory. For RocksDB, it could even exceed the cluster memory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to