mateczagany commented on code in PR #821:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1691974263
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##########
@@ -716,6 +695,48 @@ public CheckpointFetchResult fetchCheckpointInfo(
}
}
+ @Override
+ public CheckpointStatsResult fetchCheckpointStats(
+ String jobId, Long checkpointId, Configuration conf) {
+ try (RestClusterClient<String> clusterClient = getClusterClient(conf))
{
+ var checkpointStatusHeaders =
CheckpointStatisticDetailsHeaders.getInstance();
+ var parameters =
checkpointStatusHeaders.getUnresolvedMessageParameters();
+ parameters.jobPathParameter.resolve(JobID.fromHexString(jobId));
+
+ // This was needed because the parameter is protected
+ var checkpointIdPathParameter =
+ (CheckpointIdPathParameter)
Iterables.getLast(parameters.getPathParameters());
+ checkpointIdPathParameter.resolve(checkpointId);
+
+ var response =
+ clusterClient.sendRequest(
+ checkpointStatusHeaders, parameters,
EmptyRequestBody.getInstance());
+
+ var stats = response.get();
+ if (stats == null) {
+ throw new IllegalStateException("Checkpoint ID %d for job %s
does not exist!");
+ } else if (stats instanceof
CheckpointStatistics.CompletedCheckpointStatistics) {
+ return CheckpointStatsResult.completed(
+ ((CheckpointStatistics.CompletedCheckpointStatistics)
stats)
+ .getExternalPath());
+ } else if (stats instanceof
CheckpointStatistics.FailedCheckpointStatistics) {
+ return CheckpointStatsResult.error(
+ ((CheckpointStatistics.FailedCheckpointStatistics)
stats)
+ .getFailureMessage());
+ } else if (stats instanceof
CheckpointStatistics.PendingCheckpointStatistics) {
+ return CheckpointStatsResult.pending();
+ } else {
+ throw new IllegalArgumentException(
+ String.format(
+ "Unknown checkpoint statistics result class:
%s",
+ stats.getClass().getSimpleName()));
+ }
+ } catch (Exception e) {
+ LOG.error("Exception while fetching checkpoint statistics", e);
Review Comment:
This commit made it so we ignore errors when fetching checkpoint stats,
since at that point we have determined that the checkpoint was successful
anyways. We just set an empty path:
https://github.com/apache/flink-kubernetes-operator/pull/821/commits/637b3053b9644b3d6cadd1fb6a05e1f5aab75fa6
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]