ferenc-csaky commented on code in PR #821:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1687799832


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##########
@@ -426,6 +438,14 @@ public static String operatorConfigKey(String key) {
                     .withDescription(
                             "Max allowed checkpoint age for initiating 
last-state upgrades on running jobs. If a checkpoint is not available within 
the desired age (and nothing in progress) a savepoint will be triggered.");
 
+    @Documentation.Section(SECTION_DYNAMIC)
+    public static final ConfigOption<Boolean> 
OPERATOR_JOB_SAVEPOINT_DISPOSE_ON_DELETE =
+            operatorConfig("savepoint.dispose-on-delete")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Savepoint data for FlinkStateSnapshot resources 
created by the operator during upgrades and periodic savepoints will be 
disposed of automatically when the generated Kubernetes resource is deleted.");

Review Comment:
   nit: "...savepoints will be disposed ~of~ automatically when..."



##########
flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkStateSnapshotState.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.api.status;
+
+import org.apache.flink.annotation.Experimental;
+
+/** Describes current snapshot state. */
+@Experimental
+public enum FlinkStateSnapshotState {

Review Comment:
   Maybe move this inside the `FlinkStateSnapshotStatus`, and then the enum 
name could be simply `State`. This might improve the overall readability and we 
can work around the kind of confusing wording here a bit better.
   
   I see that `ReconciliationStatus` and `ReconciliationState` follows the 
current setup and this just setup the same way, but the wording is 
straightforward there.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##########
@@ -297,6 +297,18 @@ public static String operatorConfigKey(String key) {
                             "Custom HTTP header for HttpArtifactFetcher. The 
header will be applied when getting the session job artifacts. "
                                     + "Expected format: 
headerKey1:headerValue1,headerKey2:headerValue2.");
 
+    @Documentation.Section(SECTION_DYNAMIC)
+    public static final ConfigOption<Boolean> SNAPSHOT_RESOURCE_ENABLED =
+            operatorConfig("snapshot.resource.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Create new FlinkStateSnapshot resources for 
storing snapshots. "
+                                    + "Disable if you wish to use the 
deprecated mode and save snapshot results to "
+                                    + "FlinkDeployment/FlinkSessionJob status 
fields. The Operator will fallback to the "

Review Comment:
   nit: "The Operator will fallback to ~the~"



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotController.java:
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+import 
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus;
+import 
org.apache.flink.kubernetes.operator.observer.snapshot.StateSnapshotObserver;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import 
org.apache.flink.kubernetes.operator.reconciler.snapshot.StateSnapshotReconciler;
+import 
org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.EventSourceUtils;
+import org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotUtils;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
+import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
+
+import io.javaoperatorsdk.operator.api.reconciler.Cleaner;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
+import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
+import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
+import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
+import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import io.javaoperatorsdk.operator.processing.event.source.EventSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/** Controller that runs the main reconcile loop for {@link 
FlinkStateSnapshot}. */
+@ControllerConfiguration
+public class FlinkStateSnapshotController
+        implements Reconciler<FlinkStateSnapshot>,
+                ErrorStatusHandler<FlinkStateSnapshot>,
+                EventSourceInitializer<FlinkStateSnapshot>,
+                Cleaner<FlinkStateSnapshot> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FlinkStateSnapshotController.class);
+
+    private final Set<FlinkResourceValidator> validators;
+    private final FlinkResourceContextFactory ctxFactory;
+    private final StateSnapshotReconciler reconciler;
+    private final StateSnapshotObserver observer;
+    private final EventRecorder eventRecorder;
+    private final StatusRecorder<FlinkStateSnapshot, FlinkStateSnapshotStatus> 
statusRecorder;
+
+    public FlinkStateSnapshotController(
+            Set<FlinkResourceValidator> validators,
+            FlinkResourceContextFactory ctxFactory,
+            StateSnapshotReconciler reconciler,
+            StateSnapshotObserver observer,
+            EventRecorder eventRecorder,
+            StatusRecorder<FlinkStateSnapshot, FlinkStateSnapshotStatus> 
statusRecorder) {
+        this.validators = validators;
+        this.ctxFactory = ctxFactory;
+        this.reconciler = reconciler;
+        this.observer = observer;
+        this.eventRecorder = eventRecorder;
+        this.statusRecorder = statusRecorder;
+    }
+
+    @Override
+    public UpdateControl<FlinkStateSnapshot> reconcile(
+            FlinkStateSnapshot flinkStateSnapshot, Context<FlinkStateSnapshot> 
josdkContext) {
+        // status might be null here
+        flinkStateSnapshot.setStatus(
+                Objects.requireNonNullElseGet(
+                        flinkStateSnapshot.getStatus(), 
FlinkStateSnapshotStatus::new));
+        var ctx = ctxFactory.getFlinkStateSnapshotContext(flinkStateSnapshot, 
josdkContext);
+
+        observer.observe(ctx);
+
+        if (validateSnapshot(ctx)) {
+            reconciler.reconcile(ctx);
+        }
+
+        notifyListeners(ctx);
+        return getUpdateControl(ctx);
+    }
+
+    @Override
+    public DeleteControl cleanup(
+            FlinkStateSnapshot flinkStateSnapshot, Context<FlinkStateSnapshot> 
josdkContext) {
+        var ctx = ctxFactory.getFlinkStateSnapshotContext(flinkStateSnapshot, 
josdkContext);
+        DeleteControl deleteControl;

Review Comment:
   This local variable is unnecessary. We could return inside the `try`, as the 
`catch` has its own return as well.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotContext.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+import org.apache.flink.kubernetes.operator.api.spec.JobKind;
+import 
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+import java.util.Optional;
+
+/** Context for reconciling a snapshot. */
+@Getter
+@RequiredArgsConstructor
+public class FlinkStateSnapshotContext {
+
+    private final FlinkStateSnapshot resource;
+    private final FlinkStateSnapshotStatus originalStatus;
+    private final Context<FlinkStateSnapshot> josdkContext;
+    private final FlinkConfigManager configManager;
+
+    private FlinkOperatorConfiguration operatorConfig;
+    private Configuration referencedJobObserveConfig;
+    private FlinkDeployment referencedJobFlinkDeployment;
+
+    /**
+     * @return Operator configuration for this resource.
+     */
+    public FlinkOperatorConfiguration getOperatorConfig() {
+        if (operatorConfig != null) {
+            return operatorConfig;
+        }
+        return operatorConfig =
+                configManager.getOperatorConfiguration(
+                        getResource().getMetadata().getNamespace(), null);
+    }
+
+    public Optional<AbstractFlinkResource<?, ?>> getSecondaryResource() {
+        var jobRef = getResource().getSpec().getJobReference();
+        if (jobRef == null) {
+            return Optional.empty();
+        }
+
+        if (JobKind.FLINK_DEPLOYMENT.equals(jobRef.getKind())) {
+            return 
getJosdkContext().getSecondaryResource(FlinkDeployment.class).map(r -> r);
+        } else if (JobKind.FLINK_SESSION_JOB.equals(jobRef.getKind())) {
+            return 
getJosdkContext().getSecondaryResource(FlinkSessionJob.class).map(r -> r);
+        } else {
+            return Optional.empty();
+        }

Review Comment:
   nit: We can eliminate the first null-check, so the flow will be more 
straightforward, but getting the `jobKind` is a bit more complex:
   ```java
   var jobKind =
           Optional.ofNullable(getResource().getSpec().getJobReference())
                   .map(JobReference::getKind)
                   .orElse(null);
   
   if (JobKind.FLINK_DEPLOYMENT.equals(jobKind)) {
       return 
getJosdkContext().getSecondaryResource(FlinkDeployment.class).map(r -> r);
   }
   
   if (JobKind.FLINK_SESSION_JOB.equals(jobKind)) {
       return 
getJosdkContext().getSecondaryResource(FlinkSessionJob.class).map(r -> r);
   }
   
   return Optional.empty();
   ```



##########
flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/SavepointInfo.java:
##########
@@ -35,7 +35,10 @@
 @AllArgsConstructor
 @Builder
 public class SavepointInfo implements SnapshotInfo {
-    /** Last completed savepoint by the operator. */
+    /**
+     * Last completed savepoint by the operator for manual and periodic 
snapshots. Only used if

Review Comment:
   Should not this be marked as `@Deprecated` as well?



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotContext.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+import org.apache.flink.kubernetes.operator.api.spec.JobKind;
+import 
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+import java.util.Optional;
+
+/** Context for reconciling a snapshot. */
+@Getter
+@RequiredArgsConstructor
+public class FlinkStateSnapshotContext {
+
+    private final FlinkStateSnapshot resource;
+    private final FlinkStateSnapshotStatus originalStatus;
+    private final Context<FlinkStateSnapshot> josdkContext;
+    private final FlinkConfigManager configManager;
+
+    private FlinkOperatorConfiguration operatorConfig;
+    private Configuration referencedJobObserveConfig;
+    private FlinkDeployment referencedJobFlinkDeployment;
+
+    /**
+     * @return Operator configuration for this resource.
+     */
+    public FlinkOperatorConfiguration getOperatorConfig() {
+        if (operatorConfig != null) {
+            return operatorConfig;
+        }
+        return operatorConfig =
+                configManager.getOperatorConfiguration(
+                        getResource().getMetadata().getNamespace(), null);

Review Comment:
   IMO inverting the current `if` condition creates a more natural and readable 
flow with 1 `return`, so i would consider that as an improvement.
   
   But Lombok's [lazy getter](https://projectlombok.org/features/GetterLazy) 
would be even better to use for all currently non-final fields, which could be 
`final` in that case an we can eliminate the null-checks from the src.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotController.java:
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+import 
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus;
+import 
org.apache.flink.kubernetes.operator.observer.snapshot.StateSnapshotObserver;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import 
org.apache.flink.kubernetes.operator.reconciler.snapshot.StateSnapshotReconciler;
+import 
org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.EventSourceUtils;
+import org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotUtils;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
+import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
+
+import io.javaoperatorsdk.operator.api.reconciler.Cleaner;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
+import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
+import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
+import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
+import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import io.javaoperatorsdk.operator.processing.event.source.EventSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/** Controller that runs the main reconcile loop for {@link 
FlinkStateSnapshot}. */
+@ControllerConfiguration
+public class FlinkStateSnapshotController
+        implements Reconciler<FlinkStateSnapshot>,
+                ErrorStatusHandler<FlinkStateSnapshot>,
+                EventSourceInitializer<FlinkStateSnapshot>,
+                Cleaner<FlinkStateSnapshot> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FlinkStateSnapshotController.class);
+
+    private final Set<FlinkResourceValidator> validators;
+    private final FlinkResourceContextFactory ctxFactory;
+    private final StateSnapshotReconciler reconciler;
+    private final StateSnapshotObserver observer;
+    private final EventRecorder eventRecorder;
+    private final StatusRecorder<FlinkStateSnapshot, FlinkStateSnapshotStatus> 
statusRecorder;
+
+    public FlinkStateSnapshotController(

Review Comment:
   Ctor can be replaced with `@RequiredArgsConstructor`.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java:
##########
@@ -226,7 +229,27 @@ private void setJobIdIfNecessary(
     @Override
     protected void cancelJob(FlinkResourceContext<FlinkDeployment> ctx, 
UpgradeMode upgradeMode)
             throws Exception {
-        ctx.getFlinkService().cancelJob(ctx.getResource(), upgradeMode, 
ctx.getObserveConfig());
+        var conf = ctx.getObserveConfig() != null ? ctx.getObserveConfig() : 
new Configuration();
+        var savepointFormatType =
+                
conf.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE);
+
+        var savepointOpt = ctx.getFlinkService().cancelJob(ctx.getResource(), 
upgradeMode, conf);
+        savepointOpt.ifPresent(

Review Comment:
   The body of this optional could be moved to `AbstractJobReconciler` as a new 
protected method, so we only duplicate as minimal logic as we can.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java:
##########
@@ -273,15 +280,19 @@ private void disposeSavepointQuietly(
         }
     }
 
-    private void observeLatestSavepoint(
+    private void observeLatestCheckpoint(

Review Comment:
   Is this always a checkpoint? Shouldn't this be more like 
`observeLatestSnapshot`? Also, we should update the error log at L297.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/snapshot/StateSnapshotObserver.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.observer.snapshot;
+
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotState;
+import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
+import 
org.apache.flink.kubernetes.operator.controller.FlinkStateSnapshotContext;
+import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import org.apache.flink.kubernetes.operator.observer.CheckpointFetchResult;
+import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
+import 
org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotUtils;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** The observer of {@link 
org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot}. */
+@RequiredArgsConstructor
+public class StateSnapshotObserver {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(StateSnapshotObserver.class);
+
+    private final FlinkResourceContextFactory ctxFactory;
+    private final EventRecorder eventRecorder;
+
+    public void observe(FlinkStateSnapshotContext ctx) {
+        var resource = ctx.getResource();
+        var savepointState = resource.getStatus().getState();
+
+        if (FlinkStateSnapshotState.IN_PROGRESS.equals(savepointState)) {
+            observeSnapshotState(ctx);
+        }
+    }
+
+    private void observeSnapshotState(FlinkStateSnapshotContext ctx) {
+        var resource = ctx.getResource();
+        var resourceName = resource.getMetadata().getName();
+        var triggerId = resource.getStatus().getTriggerId();
+
+        if (StringUtils.isEmpty(triggerId)) {
+            return;
+        }
+
+        LOG.debug("Observing snapshot state for resource {}...", resourceName);
+
+        if (FlinkStateSnapshotUtils.abandonSnapshotIfJobNotRunning(
+                ctx.getKubernetesClient(),
+                ctx.getResource(),
+                ctx.getSecondaryResource().orElse(null),
+                eventRecorder)) {
+            return;
+        }
+
+        var jobId = 
ctx.getSecondaryResource().orElseThrow().getStatus().getJobStatus().getJobId();
+        var ctxFlinkDeployment =
+                ctxFactory.getResourceContext(
+                        ctx.getReferencedJobFlinkDeployment(), 
ctx.getJosdkContext());
+        var observeConfig = ctx.getReferencedJobObserveConfig();
+
+        if (resource.getSpec().isSavepoint()) {
+            var savepointInfo =
+                    ctxFlinkDeployment
+                            .getFlinkService()
+                            .fetchSavepointInfo(triggerId, jobId, 
observeConfig);
+            handleSavepoint(ctx, savepointInfo);
+        } else {
+            var checkpointInfo =
+                    ctxFlinkDeployment
+                            .getFlinkService()
+                            .fetchCheckpointInfo(triggerId, jobId, 
observeConfig);
+            handleCheckpoint(ctx, checkpointInfo, ctxFlinkDeployment, jobId);
+        }
+    }
+
+    private void handleSavepoint(
+            FlinkStateSnapshotContext ctx, SavepointFetchResult savepointInfo) 
{
+        var resource = ctx.getResource();
+        var resourceName = resource.getMetadata().getName();
+
+        if (savepointInfo.isPending()) {
+            LOG.debug(
+                    "Savepoint '{}' with ID {} is pending",
+                    resourceName,
+                    resource.getStatus().getTriggerId());
+        } else if (savepointInfo.getError() != null) {
+            throw new ReconciliationException(savepointInfo.getError());
+        } else {
+            LOG.info("Savepoint {} successful: {}", resourceName, 
savepointInfo.getLocation());
+            FlinkStateSnapshotUtils.snapshotSuccessful(
+                    resource, savepointInfo.getLocation(), false);
+        }
+    }
+
+    private void handleCheckpoint(
+            FlinkStateSnapshotContext ctx,
+            CheckpointFetchResult checkpointInfo,
+            FlinkResourceContext<FlinkDeployment> ctxFlinkDeployment,
+            String jobId) {
+        var resource = ctx.getResource();
+        var resourceName = resource.getMetadata().getName();
+
+        if (checkpointInfo.isPending()) {
+            LOG.debug(
+                    "Checkpoint for {} with ID {} is pending",
+                    resourceName,
+                    resource.getStatus().getTriggerId());
+            return;
+        }
+
+        if (checkpointInfo.getError() != null) {
+            throw new ReconciliationException(checkpointInfo.getError());
+        } else {

Review Comment:
   nit: `else` can be omitted, so this fairly big block can spare 1 indentation 
level.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/AuditUtils.java:
##########
@@ -53,6 +62,16 @@ private static String format(@NonNull CommonStatus<?> 
status) {
                         : status.getError());
     }
 
+    private static String format(@NonNull FlinkStateSnapshotStatus status) {
+        if (StringUtils.isEmpty(status.getError())) {
+            return String.format(
+                    ">>> Status[Snapshot] | Info | %s | %s", 
status.getState(), status.getPath());

Review Comment:
   I thin we should consider line field length consistency, just as the already 
existing `format` methods.
   
   Also now that we have different kind of `Status` logs, maybe identify the 
job status changes as well would make sense. Calling it `Status[Job]` seems 
appropriate, but we should make sure we define a correct `Status[%-8s]` 
template or something like that.
   
   And as far as I can see the `Event` logs are also aiming the same field 
length compared to status, so probably we should try to keep a completely 
consistent format.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkStateSnapshotMetrics.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** FlinkSessionJob metrics. */

Review Comment:
   `FlinkSessionJob` -> `FlinkStateSnapshot`. Same for the field level variable 
and the `onUpdate`/`onRemove` param. Copy-paste residue. :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to