Aitozi commented on code in PR #255:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/255#discussion_r890243163
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java:
##########
@@ -69,96 +69,78 @@ public void observeSavepointStatus(
.map(Savepoint::getLocation)
.orElse(null);
- observeTriggeredSavepointProgress(savepointInfo, jobId, deployedConfig)
- .ifPresent(
- err ->
- EventUtils.createOrUpdateEvent(
- flinkService.getKubernetesClient(),
- resource,
- EventUtils.Type.Warning,
- "SavepointError",
- SavepointUtils.createSavepointError(
- savepointInfo,
- resource.getSpec()
- .getJob()
-
.getSavepointTriggerNonce()),
- EventUtils.Component.Operator));
-
- // We only need to observe latest checkpoint/savepoint for terminal
jobs
- if (JobStatus.valueOf(jobStatus.getState()).isGloballyTerminalState())
{
- observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
+ // If any manual or periodic savepoint is in progress, observe it
+ if (SavepointUtils.savepointInProgress(jobStatus)) {
+ observeTriggeredSavepoint(resource, jobId, deployedConfig);
}
- var currentLastSpPath =
- Optional.ofNullable(savepointInfo.getLastSavepoint())
- .map(Savepoint::getLocation)
- .orElse(null);
-
- // If the last savepoint information changes we need to patch the
status
- // to avoid losing this in case of an operator failure after the
cluster was shut down
- if (currentLastSpPath != null &&
!currentLastSpPath.equals(previousLastSpPath)) {
- LOG.info(
- "Updating resource status after observing new last
savepoint {}",
- currentLastSpPath);
- statusHelper.patchAndCacheStatus(resource);
+ // If job is in globally terminal state, observe last savepoint
+ if (ReconciliationUtils.isJobInTerminalState(resource.getStatus())) {
+ observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
}
+
+ patchStatusOnSavepointChange(resource, savepointInfo,
previousLastSpPath);
}
/**
* Observe the savepoint result based on the current savepoint info.
*
- * @param currentSavepointInfo the current savepoint info.
+ * @param resource the resource being observed
* @param jobID the jobID of the observed job.
* @param deployedConfig Deployed job config.
* @return The observed error, if no error observed, {@code
Optional.empty()} will be returned.
*/
- private Optional<String> observeTriggeredSavepointProgress(
- SavepointInfo currentSavepointInfo, String jobID, Configuration
deployedConfig) {
- if (StringUtils.isEmpty(currentSavepointInfo.getTriggerId())) {
- LOG.debug("Savepoint not in progress");
- return Optional.empty();
- }
+ private void observeTriggeredSavepoint(
+ AbstractFlinkResource<?, ?> resource, String jobID, Configuration
deployedConfig) {
+
+ var savepointInfo =
resource.getStatus().getJobStatus().getSavepointInfo();
+
LOG.info("Observing savepoint status.");
- SavepointFetchResult savepointFetchResult =
+ var savepointFetchResult =
flinkService.fetchSavepointInfo(
- currentSavepointInfo.getTriggerId(), jobID,
deployedConfig);
+ savepointInfo.getTriggerId(), jobID, deployedConfig);
if (savepointFetchResult.isPending()) {
Review Comment:
If it is in pending, it will not check the grace period now, So the
savepoint may take a longer time than the grace period time now
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java:
##########
@@ -69,96 +69,78 @@ public void observeSavepointStatus(
.map(Savepoint::getLocation)
.orElse(null);
- observeTriggeredSavepointProgress(savepointInfo, jobId, deployedConfig)
- .ifPresent(
- err ->
- EventUtils.createOrUpdateEvent(
- flinkService.getKubernetesClient(),
- resource,
- EventUtils.Type.Warning,
- "SavepointError",
- SavepointUtils.createSavepointError(
- savepointInfo,
- resource.getSpec()
- .getJob()
-
.getSavepointTriggerNonce()),
- EventUtils.Component.Operator));
-
- // We only need to observe latest checkpoint/savepoint for terminal
jobs
- if (JobStatus.valueOf(jobStatus.getState()).isGloballyTerminalState())
{
- observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
+ // If any manual or periodic savepoint is in progress, observe it
+ if (SavepointUtils.savepointInProgress(jobStatus)) {
+ observeTriggeredSavepoint(resource, jobId, deployedConfig);
}
- var currentLastSpPath =
- Optional.ofNullable(savepointInfo.getLastSavepoint())
- .map(Savepoint::getLocation)
- .orElse(null);
-
- // If the last savepoint information changes we need to patch the
status
- // to avoid losing this in case of an operator failure after the
cluster was shut down
- if (currentLastSpPath != null &&
!currentLastSpPath.equals(previousLastSpPath)) {
- LOG.info(
- "Updating resource status after observing new last
savepoint {}",
- currentLastSpPath);
- statusHelper.patchAndCacheStatus(resource);
+ // If job is in globally terminal state, observe last savepoint
+ if (ReconciliationUtils.isJobInTerminalState(resource.getStatus())) {
+ observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
}
+
+ patchStatusOnSavepointChange(resource, savepointInfo,
previousLastSpPath);
}
/**
* Observe the savepoint result based on the current savepoint info.
*
- * @param currentSavepointInfo the current savepoint info.
+ * @param resource the resource being observed
* @param jobID the jobID of the observed job.
* @param deployedConfig Deployed job config.
* @return The observed error, if no error observed, {@code
Optional.empty()} will be returned.
Review Comment:
no return value now
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java:
##########
@@ -69,96 +69,78 @@ public void observeSavepointStatus(
.map(Savepoint::getLocation)
.orElse(null);
- observeTriggeredSavepointProgress(savepointInfo, jobId, deployedConfig)
- .ifPresent(
- err ->
- EventUtils.createOrUpdateEvent(
- flinkService.getKubernetesClient(),
- resource,
- EventUtils.Type.Warning,
- "SavepointError",
- SavepointUtils.createSavepointError(
- savepointInfo,
- resource.getSpec()
- .getJob()
-
.getSavepointTriggerNonce()),
- EventUtils.Component.Operator));
-
- // We only need to observe latest checkpoint/savepoint for terminal
jobs
- if (JobStatus.valueOf(jobStatus.getState()).isGloballyTerminalState())
{
- observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
+ // If any manual or periodic savepoint is in progress, observe it
+ if (SavepointUtils.savepointInProgress(jobStatus)) {
+ observeTriggeredSavepoint(resource, jobId, deployedConfig);
}
- var currentLastSpPath =
- Optional.ofNullable(savepointInfo.getLastSavepoint())
- .map(Savepoint::getLocation)
- .orElse(null);
-
- // If the last savepoint information changes we need to patch the
status
- // to avoid losing this in case of an operator failure after the
cluster was shut down
- if (currentLastSpPath != null &&
!currentLastSpPath.equals(previousLastSpPath)) {
- LOG.info(
- "Updating resource status after observing new last
savepoint {}",
- currentLastSpPath);
- statusHelper.patchAndCacheStatus(resource);
+ // If job is in globally terminal state, observe last savepoint
+ if (ReconciliationUtils.isJobInTerminalState(resource.getStatus())) {
+ observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
}
+
+ patchStatusOnSavepointChange(resource, savepointInfo,
previousLastSpPath);
Review Comment:
do we have to put this in a finally block to ensure this will always be done
?
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##########
@@ -58,9 +58,11 @@ public class KubernetesOperatorConfigOptions {
"The interval for observing status for in-progress
operations such as deployment and savepoints.");
public static final ConfigOption<Duration>
OPERATOR_OBSERVER_SAVEPOINT_TRIGGER_GRACE_PERIOD =
Review Comment:
The variable name could also be refactor according.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]