mxm commented on code in PR #941:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/941#discussion_r1943120952
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##########
@@ -757,16 +778,35 @@ public Map<String, String> getClusterInfo(Configuration
conf) throws Exception {
DashboardConfiguration.FIELD_NAME_FLINK_REVISION,
dashboardConfiguration.getFlinkRevision());
}
+ }
- var taskManagerReplicas =
getTaskManagersInfo(conf).getTaskManagerInfos().size();
- clusterInfo.put(
- FIELD_NAME_TOTAL_CPU,
- String.valueOf(FlinkUtils.calculateClusterCpuUsage(conf,
taskManagerReplicas)));
- clusterInfo.put(
- FIELD_NAME_TOTAL_MEMORY,
- String.valueOf(FlinkUtils.calculateClusterMemoryUsage(conf,
taskManagerReplicas)));
+ private void populateStateSize(
+ Configuration conf, @Nullable String jobId, Map<String, String>
clusterInfo)
+ throws Exception {
+ if (jobId != null) {
+ try (RestClusterClient<String> clusterClient =
getClusterClient(conf)) {
+ var checkpointingStatisticsHeaders =
CheckpointingStatisticsHeaders.getInstance();
+ var parameters =
checkpointingStatisticsHeaders.getUnresolvedMessageParameters();
+
parameters.jobPathParameter.resolve(JobID.fromHexString(jobId));
- return clusterInfo;
+ CheckpointingStatistics checkpointingStatistics =
+ clusterClient
+ .sendRequest(
+ checkpointingStatisticsHeaders,
+ parameters,
+ EmptyRequestBody.getInstance())
+ .get();
+ CheckpointStatistics.CompletedCheckpointStatistics
completedCheckpointStatistics =
+ checkpointingStatistics
+ .getLatestCheckpoints()
+ .getCompletedCheckpointStatistics();
+ if (completedCheckpointStatistics != null) {
+ clusterInfo.put(
+ FIELD_NAME_STATE_SIZE,
+
String.valueOf(completedCheckpointStatistics.getCheckpointedSize()));
Review Comment:
Good question. It is a bit tricky to find this out, even after studying the
code. Good news is that there should be a metric that is somewhat approximate
to the actual full state / checkpoint size. It was introduced via
https://issues.apache.org/jira/browse/FLINK-25557.
Turns out, I was using the wrong metric because of a misleading JavaDoc in
CheckpointStatistics. I've updated the PR. In the Rest API, there is
`stateSize` and `checkpointedSize`. The former is the full state size, while
the latter is the checkpointed size which various depending on whether
checkpointing is incremental or not.
StateSize is based on
https://github.com/apache/flink/blob/d0d44d6196c02552179ec23c799d9769e128a8ae/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java#L66.
I'm not 100% sure whether it always yields the exact state size but it should
be close.
CheckpointedSize is based on
https://github.com/apache/flink/blob/d0d44d6196c02552179ec23c799d9769e128a8ae/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeStateHandle.java#L62.
I wonder whether we should expose both?
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java:
##########
@@ -403,6 +403,28 @@ public static Long
calculateClusterMemoryUsage(Configuration conf, int taskManag
return tmTotalMemory + jmTotalMemory;
}
+ public static Long calculateClusterStateSize(Configuration conf, int
taskManagerReplicas) {
Review Comment:
State size or checkpoint size isn't directly related to the cluster memory
size. For the heap memory backend, we would expect the state size to be lower
than the overall memory. For RocksDB, it could even exceed the cluster memory.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]