gyfora commented on code in PR #821:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1667372364


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##########
@@ -222,9 +235,12 @@ private void updateStatusBeforeFirstDeployment(
             CR cr, SPEC spec, Configuration deployConfig, STATUS status, 
KubernetesClient client) {
         if (spec.getJob() != null) {
             var initialUpgradeMode = UpgradeMode.STATELESS;
+            var snapshotRef = spec.getJob().getFlinkStateSnapshotReference();
             var initialSp = spec.getJob().getInitialSavepointPath();
 
-            if (initialSp != null) {
+            if (snapshotRef != null) {
+                initialUpgradeMode = UpgradeMode.SAVEPOINT;
+            } else if (initialSp != null) {
                 status.getJobStatus()
                         .getSavepointInfo()
                         .setLastSavepoint(Savepoint.of(initialSp, 
SnapshotTriggerType.UNKNOWN));

Review Comment:
   shouldn't the last savepoint info be updated in case the snapshot ref is 
provided? This logic is there to handle failures before the initial startup so 
should apply to both 



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/snapshot/StateSnapshotReconciler.java:
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.snapshot;
+
+import org.apache.flink.autoscaler.utils.DateTimeUtils;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.kubernetes.operator.api.CrdConstants;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec;
+import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotState;
+import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
+import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
+import 
org.apache.flink.kubernetes.operator.controller.FlinkStateSnapshotContext;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import 
org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
+import org.apache.flink.util.Preconditions;
+
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import lombok.RequiredArgsConstructor;
+import org.apache.commons.lang3.ObjectUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.Optional;
+
+/** The reconciler for the {@link 
org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot}. */
+@RequiredArgsConstructor
+public class StateSnapshotReconciler {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(StateSnapshotReconciler.class);
+
+    private final FlinkResourceContextFactory ctxFactory;
+
+    public void reconcile(FlinkStateSnapshotContext ctx) throws Exception {
+        var source = ctx.getResource().getSpec().getJobReference();
+        var resource = ctx.getResource();
+
+        var savepointState = resource.getStatus().getState();
+        if (!FlinkStateSnapshotState.TRIGGER_PENDING.equals(savepointState)) {
+            return;
+        }
+
+        if (resource.getSpec().isSavepoint()
+                && resource.getSpec().getSavepoint().getAlreadyExists()) {
+            LOG.info(
+                    "Snapshot {} is marked as completed in spec, skipping 
triggering savepoint.",
+                    resource.getMetadata().getName());
+            resource.getStatus().setState(FlinkStateSnapshotState.COMPLETED);
+            
resource.getStatus().setPath(resource.getSpec().getSavepoint().getPath());
+            var time = DateTimeUtils.kubernetes(Instant.now());
+            resource.getStatus().setTriggerTimestamp(time);
+            resource.getStatus().setResultTimestamp(time);
+            return;
+        }
+
+        var secondaryResource =
+                ctx.getSecondaryResource()
+                        .orElseThrow(
+                                () ->
+                                        new RuntimeException(
+                                                String.format(
+                                                        "Secondary resource %s 
not found",
+                                                        source)));

Review Comment:
   Should this just abandon the snapshot? 



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java:
##########
@@ -329,18 +459,26 @@ private void redeployWithSavepoint(
             throws Exception {
         LOG.info("Redeploying from savepoint");
         cancelJob(ctx, UpgradeMode.STATELESS);
-        var savepoint = currentDeploySpec.getJob().getInitialSavepointPath();
+        var snapshotRef = 
currentDeploySpec.getJob().getFlinkStateSnapshotReference();
         currentDeploySpec.getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
+
+        var path = currentDeploySpec.getJob().getInitialSavepointPath();
+        if (snapshotRef != null && snapshotRef.getName() != null) {
+            path =
+                    
FlinkStateSnapshotUtils.getAndValidateFlinkStateSnapshotPath(
+                            ctx.getKubernetesClient(), snapshotRef);
+        }
+
         status.getJobStatus()
                 .getSavepointInfo()
-                .setLastSavepoint(Savepoint.of(savepoint, 
SnapshotTriggerType.UNKNOWN));
+                .setLastSavepoint(Savepoint.of(path, 
SnapshotTriggerType.UNKNOWN));

Review Comment:
   See my other long comment



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java:
##########
@@ -266,19 +304,31 @@ protected void restoreJob(
         Optional<String> savepointOpt = Optional.empty();
 
         if (spec.getJob().getUpgradeMode() != UpgradeMode.STATELESS) {
-            savepointOpt =
-                    Optional.ofNullable(
-                                    ctx.getResource()
-                                            .getStatus()
-                                            .getJobStatus()
-                                            .getSavepointInfo()
-                                            .getLastSavepoint())
-                            .flatMap(s -> 
Optional.ofNullable(s.getLocation()));
+            if (FlinkStateSnapshotUtils.shouldCreateSnapshotResource(
+                    ctx.getOperatorConfig(), deployConfig)) {
+                savepointOpt = 
getLatestSavepointPathFromFlinkStateSnapshots(ctx);
+            } else {
+                savepointOpt =
+                        Optional.ofNullable(
+                                        ctx.getResource()
+                                                .getStatus()
+                                                .getJobStatus()
+                                                .getSavepointInfo()
+                                                .getLastSavepoint())
+                                .flatMap(s -> 
Optional.ofNullable(s.getLocation()));

Review Comment:
   The problem with this logic is that the snapshotReference field allows us to 
specify the path directly. So if the user starts the job using that, in the 
code we don't update the lastSavepointInfo but we also won't find that path 
here. So we will simply restart from empty state in those cases which is not 
what should happen.
   
   2 main ways to solve it: 
    1.  We keep the current lastSavepoint logic in the status and update it 
whenever a snapshot is completed. This is a bit problematic because we would 
have to update the status of the FlinkDeployment/SessionJob from the snapshot 
controller, which is a bit weird.
    2. Introduce a new field in the status to capture the savepoint reference 
used during a Savepoint deployment. I think this is the way to go because this 
is the functionality that is important to capture. When we submit with 
initialSavepointPath the first time / savepoint redeploy / savepoint upgrade, 
in the lastReconciledSpec we have upgradeMode: SAVEPOINT recorded and it is 
assumed that the lastSavepoint contains the savepoint used. Now we are getting 
rid of the lastSavepoint so we should put something more meaningful in place in 
the status to capture the actual upgrade related behaviour. One possibility 
here is simply to use the `job.snapshotReference` field in the 
lastReconciledSpec to capture the upgrade savepoint and clear it in other cases 
(same way of how we reuse the upgradeMode field to capture how we actually 
stopped the job in the lastReconciledSpec)
     



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java:
##########
@@ -226,7 +230,36 @@ private void setJobIdIfNecessary(
     @Override
     protected void cancelJob(FlinkResourceContext<FlinkDeployment> ctx, 
UpgradeMode upgradeMode)
             throws Exception {
-        ctx.getFlinkService().cancelJob(ctx.getResource(), upgradeMode, 
ctx.getObserveConfig());
+        var conf = ctx.getObserveConfig() != null ? ctx.getObserveConfig() : 
new Configuration();
+        var savepointFormatType =
+                
conf.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE);
+
+        var savepointOpt = ctx.getFlinkService().cancelJob(ctx.getResource(), 
upgradeMode, conf);
+        savepointOpt.ifPresent(
+                location -> {
+                    if (FlinkStateSnapshotUtils.shouldCreateSnapshotResource(
+                            ctx.getOperatorConfig(), conf)) {
+                        FlinkStateSnapshotUtils.createUpgradeSavepointResource(
+                                ctx.getKubernetesClient(),
+                                ctx.getResource(),
+                                location,
+                                
SavepointFormatType.valueOf(savepointFormatType.name()),
+                                conf.get(
+                                        KubernetesOperatorConfigOptions
+                                                
.OPERATOR_JOB_SAVEPOINT_DISPOSE_ON_DELETE));
+                    } else {
+                        Savepoint sp =
+                                Savepoint.of(
+                                        location,
+                                        SnapshotTriggerType.UPGRADE,
+                                        
SavepointFormatType.valueOf(savepointFormatType.name()));
+                        ctx.getResource()
+                                .getStatus()
+                                .getJobStatus()
+                                .getSavepointInfo()
+                                .updateLastSavepoint(sp);

Review Comment:
   see my other long comment



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/snapshot/StateSnapshotObserver.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.observer.snapshot;
+
+import org.apache.flink.autoscaler.utils.DateTimeUtils;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotState;
+import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
+import 
org.apache.flink.kubernetes.operator.controller.FlinkStateSnapshotContext;
+import org.apache.flink.kubernetes.operator.observer.CheckpointFetchResult;
+import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import 
org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import lombok.RequiredArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+
+/** The observer of {@link 
org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot}. */
+@RequiredArgsConstructor
+public class StateSnapshotObserver {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(StateSnapshotObserver.class);
+
+    private final FlinkResourceContextFactory ctxFactory;
+    private final EventRecorder eventRecorder;
+
+    public void observe(FlinkStateSnapshotContext ctx) {
+        var resource = ctx.getResource();
+        var savepointState = resource.getStatus().getState();
+
+        if (FlinkStateSnapshotState.IN_PROGRESS.equals(savepointState)) {
+            observeSavepointState(ctx);
+        }
+    }
+
+    private void observeSavepointState(FlinkStateSnapshotContext ctx) {
+        var resource = ctx.getResource();
+        var resourceName = resource.getMetadata().getName();
+        var triggerId = resource.getStatus().getTriggerId();
+
+        if (StringUtils.isEmpty(triggerId)) {
+            return;
+        }
+
+        LOG.debug("Observing savepoint state for resource {}...", 
resourceName);
+        var secondaryResource =
+                ctx.getSecondaryResource()
+                        .orElseThrow(
+                                () ->
+                                        new RuntimeException(
+                                                String.format(
+                                                        "Secondary resource %s 
for savepoint %s was not found",
+                                                        
resource.getSpec().getJobReference(),
+                                                        resourceName)));

Review Comment:
   If the target is not found anymore to me that sounds similar to the not 
running case, and probably should be marked abandoned. What is your thinking 
here?



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/snapshot/StateSnapshotReconciler.java:
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.snapshot;
+
+import org.apache.flink.autoscaler.utils.DateTimeUtils;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.kubernetes.operator.api.CrdConstants;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec;
+import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotState;
+import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
+import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
+import 
org.apache.flink.kubernetes.operator.controller.FlinkStateSnapshotContext;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import 
org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
+import org.apache.flink.util.Preconditions;
+
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import lombok.RequiredArgsConstructor;
+import org.apache.commons.lang3.ObjectUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.Optional;
+
+/** The reconciler for the {@link 
org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot}. */
+@RequiredArgsConstructor
+public class StateSnapshotReconciler {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(StateSnapshotReconciler.class);
+
+    private final FlinkResourceContextFactory ctxFactory;
+
+    public void reconcile(FlinkStateSnapshotContext ctx) throws Exception {
+        var source = ctx.getResource().getSpec().getJobReference();
+        var resource = ctx.getResource();
+
+        var savepointState = resource.getStatus().getState();
+        if (!FlinkStateSnapshotState.TRIGGER_PENDING.equals(savepointState)) {
+            return;
+        }
+
+        if (resource.getSpec().isSavepoint()
+                && resource.getSpec().getSavepoint().getAlreadyExists()) {
+            LOG.info(
+                    "Snapshot {} is marked as completed in spec, skipping 
triggering savepoint.",
+                    resource.getMetadata().getName());
+            resource.getStatus().setState(FlinkStateSnapshotState.COMPLETED);
+            
resource.getStatus().setPath(resource.getSpec().getSavepoint().getPath());
+            var time = DateTimeUtils.kubernetes(Instant.now());
+            resource.getStatus().setTriggerTimestamp(time);
+            resource.getStatus().setResultTimestamp(time);
+            return;
+        }
+
+        var secondaryResource =
+                ctx.getSecondaryResource()
+                        .orElseThrow(
+                                () ->
+                                        new RuntimeException(
+                                                String.format(
+                                                        "Secondary resource %s 
not found",
+                                                        source)));
+        if (!ReconciliationUtils.isJobRunning(secondaryResource.getStatus())) {
+            LOG.warn(
+                    "Target job {} for savepoint {} is not running, cannot 
trigger snapshot.",
+                    secondaryResource.getMetadata().getName(),
+                    resource.getMetadata().getName());
+            return;
+        }

Review Comment:
   we have already observed and abandoned the snapshot in this case right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to