This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new d9282263 [FLINK-36162] Remove flinkStateSnapshotReference and 
namespace from FlinkStateSnapshot jobReference
d9282263 is described below

commit d9282263baad879a6a00f9614d985a0e1a47740c
Author: Gyula Fora <[email protected]>
AuthorDate: Wed Aug 28 11:22:30 2024 +0200

    [FLINK-36162] Remove flinkStateSnapshotReference and namespace from 
FlinkStateSnapshot jobReference
---
 docs/content/docs/concepts/controller-flow.md      |   2 +-
 .../content/docs/custom-resource/job-management.md |   9 +-
 docs/content/docs/custom-resource/reference.md     |  17 +--
 docs/content/docs/custom-resource/snapshots.md     |  10 +-
 docs/content/docs/operations/upgrade.md            |   7 +-
 e2e-tests/test_snapshot.sh                         |  10 +-
 examples/snapshot/job-from-savepoint.yaml          |   4 +-
 .../api/spec/FlinkStateSnapshotReference.java      |  58 ---------
 .../kubernetes/operator/api/spec/JobReference.java |   9 +-
 .../kubernetes/operator/api/spec/JobSpec.java      |  13 +--
 .../kubernetes/operator/api/status/JobStatus.java  |   3 +-
 .../operator/observer/SnapshotObserver.java        |   6 +-
 .../AbstractFlinkResourceReconciler.java           |  24 +---
 .../deployment/AbstractJobReconciler.java          |  68 ++++-------
 .../deployment/ApplicationReconciler.java          |   7 +-
 .../sessionjob/SessionJobReconciler.java           |   2 +-
 .../operator/utils/EventSourceUtils.java           |   6 +-
 .../operator/utils/FlinkStateSnapshotUtils.java    |  88 ++------------
 .../kubernetes/operator/utils/SnapshotUtils.java   |  30 ++---
 .../operator/validation/DefaultValidator.java      |  11 +-
 .../controller/FlinkDeploymentControllerTest.java  |  15 +--
 .../controller/FlinkSessionJobControllerTest.java  |  18 +--
 .../FlinkStateSnapshotControllerTest.java          |  12 +-
 .../deployment/ApplicationObserverTest.java        |   4 +-
 .../deployment/ApplicationReconcilerTest.java      |  23 ++--
 .../ApplicationReconcilerUpgradeModeTest.java      |  72 +++---------
 .../sessionjob/SessionJobReconcilerTest.java       |  24 ++--
 .../operator/service/AbstractFlinkServiceTest.java |   4 +-
 .../utils/FlinkStateSnapshotUtilsTest.java         | 129 ++-------------------
 .../operator/utils/SnapshotUtilsTest.java          |  13 +--
 .../operator/validation/DefaultValidatorTest.java  |  22 +---
 .../crds/flinkdeployments.flink.apache.org-v1.yml  |  20 +---
 .../crds/flinksessionjobs.flink.apache.org-v1.yml  |  20 +---
 .../flinkstatesnapshots.flink.apache.org-v1.yml    |   2 -
 34 files changed, 149 insertions(+), 613 deletions(-)

diff --git a/docs/content/docs/concepts/controller-flow.md 
b/docs/content/docs/concepts/controller-flow.md
index 8e8d84c0..b6d876ba 100644
--- a/docs/content/docs/concepts/controller-flow.md
+++ b/docs/content/docs/concepts/controller-flow.md
@@ -98,7 +98,7 @@ It’s very important to understand that the Observer phase 
records a point-in-t
 The `AbstractFlinkResourceReconciler` encapsulates the core reconciliation 
flow for all Flink resource types. Let’s take a look at the high level flow 
before we go into specifics for session, application and session job resources.
 
 1. Check if the resource is ready for reconciliation or if there are any 
pending operations that should not be interrupted (manual savepoints for 
example)
-2. If this is the first deployment attempt for the resource, we simply deploy 
it. It’s important to note here that this is the only deploy operation where we 
use the `flinkStateSnapshotReference` provided in the spec.
+2. If this is the first deployment attempt for the resource, we simply deploy 
it. It’s important to note here that this is the only deploy operation where we 
use the `initialSavepointPath` provided in the spec.
 3. Next we determine if the desired spec changed and the type of change: 
`IGNORE, SCALE, UPGRADE`. Only for scale and upgrade type changes do we need to 
execute further reconciliation logic.
 4. If we have upgrade/scale spec changes we execute the upgrade logic specific 
for the resource type
 5. If we did not receive any spec change we still have to ensure that the 
currently deployed resources are fully reconciled:
diff --git a/docs/content/docs/custom-resource/job-management.md 
b/docs/content/docs/custom-resource/job-management.md
index 790c825a..231f92a8 100644
--- a/docs/content/docs/custom-resource/job-management.md
+++ b/docs/content/docs/custom-resource/job-management.md
@@ -247,17 +247,16 @@ Users have two options to restore a job from a target 
savepoint / checkpoint
 
 ### Redeploy using the savepointRedeployNonce
 
-It is possible to redeploy a `FlinkDeployment` or `FlinkSessionJob` resource 
from a target savepoint by using the combination of `savepointRedeployNonce` 
and `flinkStateSnapshotReference` in the job spec:
+It is possible to redeploy a `FlinkDeployment` or `FlinkSessionJob` resource 
from a target savepoint by using the combination of `savepointRedeployNonce` 
and `initialSavepointPath` in the job spec:
 
 ```yaml
  job:
-   flinkStateSnapshotReference:
-     path: file://redeploy-target-savepoint
+   initialSavepointPath: file://redeploy-target-savepoint
    # If not set previously, set to 1, otherwise increment, e.g. 2
    savepointRedeployNonce: 1
 ```
 
-When changing the `savepointRedeployNonce` the operator will redeploy the job 
to the savepoint defined in the `flinkStateSnapshotReference`. The savepoint 
path must not be empty. 
+When changing the `savepointRedeployNonce` the operator will redeploy the job 
to the savepoint defined in the `initialSavepointPath`. The savepoint path must 
not be empty. 
 
 {{< hint warning >}}
 Rollbacks are not supported after redeployments.
@@ -271,7 +270,7 @@ However, this also means that savepoint history is lost and 
the operator won't c
  1. Locate the latest checkpoint/savepoint metafile in your configured 
checkpoint/savepoint directory.
  2. Delete the `FlinkDeployment` resource for your application
  3. Check that you have the current savepoint, and that your `FlinkDeployment` 
is deleted completely
- 4. Modify your `FlinkDeployment` JobSpec and set 
`flinkStateSnapshotReference.path` to your last checkpoint location
+ 4. Modify your `FlinkDeployment` JobSpec and set `initialSavepointPath` to 
your last checkpoint location
  5. Recreate the deployment
 
 These steps ensure that the operator will start completely fresh from the user 
defined savepoint path and can hopefully fully recover.
diff --git a/docs/content/docs/custom-resource/reference.md 
b/docs/content/docs/custom-resource/reference.md
index 3348f46e..12e1d88c 100644
--- a/docs/content/docs/custom-resource/reference.md
+++ b/docs/content/docs/custom-resource/reference.md
@@ -90,17 +90,6 @@ This serves as a full reference for FlinkDeployment and 
FlinkSessionJob custom r
 | flinkConfiguration | java.util.Map<java.lang.String,java.lang.String> | 
Flink configuration overrides for the Flink deployment or Flink session job. |
 | deploymentName | java.lang.String | The name of the target session cluster 
deployment. |
 
-### FlinkStateSnapshotReference
-**Class**: 
org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference
-
-**Description**: Reference for a FlinkStateSnapshot.
-
-| Parameter | Type | Docs |
-| ----------| ---- | ---- |
-| namespace | java.lang.String | Namespace of the snapshot resource. |
-| name | java.lang.String | Name of the snapshot resource. |
-| path | java.lang.String | If a path is given, all other fields will be 
ignored, and this will be used as the initial savepoint path. |
-
 ### FlinkStateSnapshotSpec
 **Class**: org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec
 
@@ -172,7 +161,6 @@ This serves as a full reference for FlinkDeployment and 
FlinkSessionJob custom r
 | ----------| ---- | ---- |
 | kind | org.apache.flink.kubernetes.operator.api.spec.JobKind | Kind of the 
Flink resource, FlinkDeployment or FlinkSessionJob. |
 | name | java.lang.String | Name of the Flink resource. |
-| namespace | java.lang.String | Namespace of the Flink resource. If empty, 
the operator will use the namespace of the snapshot. |
 
 ### JobSpec
 **Class**: org.apache.flink.kubernetes.operator.api.spec.JobSpec
@@ -188,11 +176,10 @@ This serves as a full reference for FlinkDeployment and 
FlinkSessionJob custom r
 | state | org.apache.flink.kubernetes.operator.api.spec.JobState | Desired 
state for the job. |
 | savepointTriggerNonce | java.lang.Long | Nonce used to manually trigger 
savepoint for the running job. In order to trigger a savepoint, change the 
number to a different non-null value. |
 | initialSavepointPath | java.lang.String | Savepoint path used by the job the 
first time it is deployed or during savepoint redeployments (triggered by 
changing the savepointRedeployNonce). |
-| flinkStateSnapshotReference | 
org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference | 
Snapshot reference used by the job the first time it is deployed or during 
savepoint redeployments (triggered by changing the savepointRedeployNonce). |
 | checkpointTriggerNonce | java.lang.Long | Nonce used to manually trigger 
checkpoint for the running job. In order to trigger a checkpoint, change the 
number to a different non-null value. |
 | upgradeMode | org.apache.flink.kubernetes.operator.api.spec.UpgradeMode | 
Upgrade mode of the Flink job. |
 | allowNonRestoredState | java.lang.Boolean | Allow checkpoint state that 
cannot be mapped to any job vertex in tasks. |
-| savepointRedeployNonce | java.lang.Long | Nonce used to trigger a full 
redeployment of the job from the savepoint path specified in 
initialSavepointPath or the path/FlinkStateSnapshot reference in 
flinkStateSnapshotReference. In order to trigger redeployment, change the 
number to a different non-null value. Rollback is not possible after 
redeployment. |
+| savepointRedeployNonce | java.lang.Long | Nonce used to trigger a full 
redeployment of the job from the savepoint path specified in 
initialSavepointPath. In order to trigger redeployment, change the number to a 
different non-null value. Rollback is not possible after redeployment. |
 
 ### JobState
 **Class**: org.apache.flink.kubernetes.operator.api.spec.JobState
@@ -418,7 +405,7 @@ This serves as a full reference for FlinkDeployment and 
FlinkSessionJob custom r
 | state | java.lang.String | Last observed state of the job. |
 | startTime | java.lang.String | Start time of the job. |
 | updateTime | java.lang.String | Update time of the job. |
-| upgradeSnapshotReference | 
org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference |  |
+| upgradeSavepointPath | java.lang.String |  |
 | savepointInfo | 
org.apache.flink.kubernetes.operator.api.status.SavepointInfo | Information 
about pending and last savepoint for the job. |
 | checkpointInfo | 
org.apache.flink.kubernetes.operator.api.status.CheckpointInfo | Information 
about pending and last checkpoint for the job. |
 
diff --git a/docs/content/docs/custom-resource/snapshots.md 
b/docs/content/docs/custom-resource/snapshots.md
index e0d39835..3c1771bc 100644
--- a/docs/content/docs/custom-resource/snapshots.md
+++ b/docs/content/docs/custom-resource/snapshots.md
@@ -38,7 +38,7 @@ If you set this to false, the operator will keep using the 
deprecated status fie
 To create a savepoint or checkpoint, exactly one of the spec fields 
`savepoint` or `checkpoint` must present. 
 Furthermore, in case of a savepoint you can signal to the operator that the 
savepoint already exists using the `alreadyExists` field, and the operator will 
mark it as a successful snapshot in the next reconciliation phase.
 
-You can also instruct the Operator to start a new 
FlinkDeployment/FlinkSessionJob from an existing snapshot CR by using 
`flinkStateSnapshotReference` in the job spec.
+You can also instruct the Operator to start a new 
FlinkDeployment/FlinkSessionJob from an existing snapshot by using 
`initialSavepointPath` in the job spec.
 
 ## Examples
 
@@ -78,11 +78,11 @@ spec:
 
 ### Start job from existing snapshot
 
+To start a job from an existing snapshot, you need to extract the path then 
use:
+
 ```yaml
  job:
-   flinkStateSnapshotReference:
-     namespace: flink  # not required if it's in the same namespace
-     name: example-savepoint
+   initialSavepointPath: [savepoint_path]
 ```
 
 {{< hint warning >}}
@@ -131,7 +131,7 @@ This feature is not available for checkpoints.
 ## Triggering snapshots
 
 Upgrade savepoints are triggered automatically by the system during the 
upgrade process as we have seen in the previous sections.
-In this case, the savepoint path will also be recorded in the 
`upgradeSnapshotReference` job status field, which the operator will use when 
restarting the job.
+In this case, the savepoint path will also be recorded in the 
`upgradeSavepointPath` job status field, which the operator will use when 
restarting the job.
 
 For backup, job forking and other purposes savepoint and checkpoints can be 
triggered manually or periodically by the operator, however generally speaking 
these will not be used during upgrades and are not required for the correct 
operation.
 
diff --git a/docs/content/docs/operations/upgrade.md 
b/docs/content/docs/operations/upgrade.md
index 65f188b4..e9b7ec92 100644
--- a/docs/content/docs/operations/upgrade.md
+++ b/docs/content/docs/operations/upgrade.md
@@ -148,20 +148,19 @@ Here is a reference example of upgrading a 
`basic-checkpoint-ha-example` deploym
     ```
 5. Restore the job:
 
-   Deploy the previously deleted job using this 
[FlinkDeployemnt](https://raw.githubusercontent.com/apache/flink-kubernetes-operator/main/examples/basic-checkpoint-ha.yaml)
 with `v1beta1` and explicitly set the `job.flinkStateSnapshotReference.path` 
to the savepoint location obtained from the step 1.
+   Deploy the previously deleted job using this 
[FlinkDeployemnt](https://raw.githubusercontent.com/apache/flink-kubernetes-operator/main/examples/basic-checkpoint-ha.yaml)
 with `v1beta1` and explicitly set the `job.initialSavepointPath` to the 
savepoint location obtained from the step 1.
 
     ```
     spec:
       ...
       job:
-        flinkStateSnapshotReference: 
-          path: /flink-data/savepoints/savepoint-000000-aec3dd08e76d/_metadata
+        initialSavepointPath: 
/flink-data/savepoints/savepoint-000000-aec3dd08e76d/_metadata
         upgradeMode: savepoint
       ...
     ```
     Alternatively, we may use this command to edit and deploy the manifest:
     ```sh
-    wget -qO - 
https://raw.githubusercontent.com/apache/flink-kubernetes-operator/main/examples/basic-checkpoint-ha.yaml|
 yq w - "spec.job.flinkStateSnapshotReference.path" 
"/flink-data/savepoints/savepoint-000000-aec3dd08e76d/_metadata"| kubectl apply 
-f -
+    wget -qO - 
https://raw.githubusercontent.com/apache/flink-kubernetes-operator/main/examples/basic-checkpoint-ha.yaml|
 yq w - "spec.job.initialSavepointPath" 
"/flink-data/savepoints/savepoint-000000-aec3dd08e76d/_metadata"| kubectl apply 
-f -
     ```
    Finally, verify that `deploy/basic-checkpoint-ha-example` log has:
     ```
diff --git a/e2e-tests/test_snapshot.sh b/e2e-tests/test_snapshot.sh
index 8d080465..32006ddb 100755
--- a/e2e-tests/test_snapshot.sh
+++ b/e2e-tests/test_snapshot.sh
@@ -56,10 +56,10 @@ wait_for_status $APPLICATION_IDENTIFIER 
'.status.jobStatus.state' RUNNING ${TIME
 kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch 
'{"spec":{"job":{"state": "suspended"}}}'
 wait_for_status $APPLICATION_IDENTIFIER '.status.lifecycleState' "SUSPENDED" 
${TIMEOUT} || exit 1
 
-location=$(kubectl get $APPLICATION_IDENTIFIER -o yaml | yq 
'.status.jobStatus.upgradeSnapshotReference.path')
+location=$(kubectl get $APPLICATION_IDENTIFIER -o yaml | yq 
'.status.jobStatus.upgradeSavepointPath')
 if [ "$location" == "" ]; then echo "Legacy savepoint location was empty"; 
exit 1; fi
-echo "Removing upgradeSnapshotReference and setting lastSavepoint"
-kubectl patch flinkdep ${CLUSTER_ID} --type=merge --subresource status --patch 
'{"status":{"jobStatus":{"upgradeSnapshotReference":null,"savepointInfo":{"lastSavepoint":{"timeStamp":
 0, "location": "'$location'", "triggerNonce": 0}}}}}'
+echo "Removing upgradeSavepointPath and setting lastSavepoint"
+kubectl patch flinkdep ${CLUSTER_ID} --type=merge --subresource status --patch 
'{"status":{"jobStatus":{"upgradeSavepointPath":null,"savepointInfo":{"lastSavepoint":{"timeStamp":
 0, "location": "'$location'", "triggerNonce": 0}}}}}'
 
 # Delete operator Pod to clear CR state cache
 kubectl delete pod -n $(get_operator_pod_namespace) $(get_operator_pod_name)
@@ -151,13 +151,11 @@ wait_for_status $APPLICATION_IDENTIFIER 
'.status.lifecycleState' "SUSPENDED" ${T
 echo "Waiting for upgrade savepoint..."
 snapshot=$(wait_for_snapshot $CLUSTER_ID "savepoint" "upgrade" ${TIMEOUT})
 if [ "$snapshot" == "" ]; then echo "Could not find snapshot"; exit 1; fi
-echo "Found upgrade snapshot: $snapshot"
-wait_for_status $APPLICATION_IDENTIFIER 
'.status.jobStatus.upgradeSnapshotReference.name' "$snapshot" ${TIMEOUT} || 
exit 1
 
 location=$(kubectl get flinksnp/$snapshot -o yaml | yq '.status.path')
 if [ "$location" == "" ]; then echo "Upgrade savepoint location was empty"; 
exit 1; fi
 
-
+wait_for_status $APPLICATION_IDENTIFIER 
'.status.jobStatus.upgradeSavepointPath' "$location" ${TIMEOUT} || exit 1
 
 echo "Restarting deployment..."
 kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"job": 
{"state": "running" } } }'
diff --git a/examples/snapshot/job-from-savepoint.yaml 
b/examples/snapshot/job-from-savepoint.yaml
index 698ad220..cc6627a0 100644
--- a/examples/snapshot/job-from-savepoint.yaml
+++ b/examples/snapshot/job-from-savepoint.yaml
@@ -64,6 +64,4 @@ spec:
     jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
     parallelism: 2
     upgradeMode: savepoint
-    flinkStateSnapshotReference:
-      name: example-savepoint
-      namespace: flink
+    initialSavepointPath: 
file:///flink-data/savepoints/savepoint-45305c-d867504446e0
diff --git 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkStateSnapshotReference.java
 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkStateSnapshotReference.java
deleted file mode 100644
index 548692c9..00000000
--- 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkStateSnapshotReference.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.kubernetes.operator.api.spec;
-
-import org.apache.flink.annotation.Experimental;
-import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-/** Reference for a FlinkStateSnapshot. */
-@Experimental
-@Data
-@NoArgsConstructor
-@AllArgsConstructor
-@Builder
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class FlinkStateSnapshotReference {
-
-    /** Namespace of the snapshot resource. */
-    private String namespace;
-
-    /** Name of the snapshot resource. */
-    private String name;
-
-    /**
-     * If a path is given, all other fields will be ignored, and this will be 
used as the initial
-     * savepoint path.
-     */
-    private String path;
-
-    public static FlinkStateSnapshotReference fromPath(String path) {
-        return new FlinkStateSnapshotReference(null, null, path);
-    }
-
-    public static FlinkStateSnapshotReference fromResource(FlinkStateSnapshot 
resource) {
-        return new FlinkStateSnapshotReference(
-                resource.getMetadata().getNamespace(), 
resource.getMetadata().getName(), null);
-    }
-}
diff --git 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobReference.java
 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobReference.java
index 068b0a64..856e4579 100644
--- 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobReference.java
+++ 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobReference.java
@@ -44,16 +44,9 @@ public class JobReference {
     /** Name of the Flink resource. */
     private String name;
 
-    /**
-     * Namespace of the Flink resource. If empty, the operator will use the 
namespace of the
-     * snapshot.
-     */
-    private String namespace;
-
     public static JobReference fromFlinkResource(AbstractFlinkResource<?, ?> 
flinkResource) {
         var result = new JobReference();
         result.setName(flinkResource.getMetadata().getName());
-        result.setNamespace(flinkResource.getMetadata().getNamespace());
 
         if (flinkResource instanceof FlinkDeployment) {
             result.setKind(JobKind.FLINK_DEPLOYMENT);
@@ -71,6 +64,6 @@ public class JobReference {
         } else if (kind == JobKind.FLINK_SESSION_JOB) {
             kindString = CrdConstants.KIND_SESSION_JOB;
         }
-        return String.format("%s/%s (%s)", namespace, name, kindString);
+        return String.format("%s (%s)", name, kindString);
     }
 }
diff --git 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobSpec.java
 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobSpec.java
index 1e845b13..a6c582cc 100644
--- 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobSpec.java
+++ 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobSpec.java
@@ -72,16 +72,8 @@ public class JobSpec implements Diffable<JobSpec> {
      * redeployments (triggered by changing the savepointRedeployNonce).
      */
     @SpecDiff(DiffType.IGNORE)
-    @Deprecated
     private String initialSavepointPath;
 
-    /**
-     * Snapshot reference used by the job the first time it is deployed or 
during savepoint
-     * redeployments (triggered by changing the savepointRedeployNonce).
-     */
-    @SpecDiff(DiffType.IGNORE)
-    private FlinkStateSnapshotReference flinkStateSnapshotReference;
-
     /**
      * Nonce used to manually trigger checkpoint for the running job. In order 
to trigger a
      * checkpoint, change the number to a different non-null value.
@@ -100,9 +92,8 @@ public class JobSpec implements Diffable<JobSpec> {
 
     /**
      * Nonce used to trigger a full redeployment of the job from the savepoint 
path specified in
-     * initialSavepointPath or the path/FlinkStateSnapshot reference in 
flinkStateSnapshotReference.
-     * In order to trigger redeployment, change the number to a different 
non-null value. Rollback
-     * is not possible after redeployment.
+     * initialSavepointPath. In order to trigger redeployment, change the 
number to a different
+     * non-null value. Rollback is not possible after redeployment.
      */
     @SpecDiff(value = DiffType.SAVEPOINT_REDEPLOY, onNullIgnore = true)
     private Long savepointRedeployNonce;
diff --git 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobStatus.java
 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobStatus.java
index 28cfde87..6adef53c 100644
--- 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobStatus.java
+++ 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobStatus.java
@@ -18,7 +18,6 @@
 package org.apache.flink.kubernetes.operator.api.status;
 
 import org.apache.flink.annotation.Experimental;
-import 
org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;
 
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import io.fabric8.crd.generator.annotation.PrinterColumn;
@@ -51,7 +50,7 @@ public class JobStatus {
     /** Update time of the job. */
     private String updateTime;
 
-    private FlinkStateSnapshotReference upgradeSnapshotReference;
+    private String upgradeSavepointPath;
 
     /** Information about pending and last savepoint for the job. */
     @Deprecated private SavepointInfo savepointInfo = new SavepointInfo();
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java
index 862a5dd6..59272af6 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java
@@ -23,7 +23,6 @@ import org.apache.flink.autoscaler.utils.DateTimeUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
-import 
org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;
 import org.apache.flink.kubernetes.operator.api.status.Checkpoint;
 import org.apache.flink.kubernetes.operator.api.status.CheckpointInfo;
 import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
@@ -454,10 +453,7 @@ public class SnapshotObserver<
             flinkService
                     .getLastCheckpoint(JobID.fromHexString(jobID), 
observeConfig)
                     .ifPresent(
-                            snapshot ->
-                                    jobStatus.setUpgradeSnapshotReference(
-                                            
FlinkStateSnapshotReference.fromPath(
-                                                    snapshot.getLocation())));
+                            snapshot -> 
jobStatus.setUpgradeSavepointPath(snapshot.getLocation()));
         } catch (Exception e) {
             LOG.error("Could not observe latest checkpoint information.", e);
             throw new ReconciliationException(e);
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index 8f2c1a7e..cc57889b 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -27,7 +27,6 @@ import 
org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.diff.DiffType;
 import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
-import 
org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;
 import org.apache.flink.kubernetes.operator.api.spec.JobState;
 import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
@@ -41,7 +40,6 @@ import 
org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.reconciler.diff.DiffResult;
 import 
org.apache.flink.kubernetes.operator.reconciler.diff.ReflectiveDiffBuilder;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
-import org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotUtils;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
@@ -122,7 +120,7 @@ public abstract class AbstractFlinkResourceReconciler<
             updateStatusBeforeFirstDeployment(
                     cr, spec, deployConfig, status, ctx.getKubernetesClient());
 
-            deploy(ctx, spec, deployConfig, getInitialSnapshotPath(ctx, spec), 
false);
+            deploy(ctx, spec, deployConfig, getInitialSnapshotPath(spec), 
false);
 
             ReconciliationUtils.updateStatusForDeployedSpec(cr, deployConfig, 
clock);
             return;
@@ -178,19 +176,11 @@ public abstract class AbstractFlinkResourceReconciler<
         }
     }
 
-    private Optional<String> getInitialSnapshotPath(
-            FlinkResourceContext<CR> ctx, AbstractFlinkSpec spec) {
+    private Optional<String> getInitialSnapshotPath(AbstractFlinkSpec spec) {
         if (spec.getJob() == null) {
             return Optional.empty();
         }
 
-        if (spec.getJob().getFlinkStateSnapshotReference() != null) {
-            return Optional.of(
-                    FlinkStateSnapshotUtils.getValidatedFlinkStateSnapshotPath(
-                            ctx.getKubernetesClient(),
-                            spec.getJob().getFlinkStateSnapshotReference()));
-        }
-
         return Optional.ofNullable(spec.getJob().getInitialSavepointPath());
     }
 
@@ -233,16 +223,10 @@ public abstract class AbstractFlinkResourceReconciler<
             CR cr, SPEC spec, Configuration deployConfig, STATUS status, 
KubernetesClient client) {
         if (spec.getJob() != null) {
             var initialUpgradeMode = UpgradeMode.STATELESS;
-            var snapshotRef = spec.getJob().getFlinkStateSnapshotReference();
             var initialSp = spec.getJob().getInitialSavepointPath();
 
-            if (snapshotRef != null) {
-                status.getJobStatus().setUpgradeSnapshotReference(snapshotRef);
-                initialUpgradeMode = UpgradeMode.SAVEPOINT;
-            } else if (initialSp != null) {
-                status.getJobStatus()
-                        .setUpgradeSnapshotReference(
-                                
FlinkStateSnapshotReference.fromPath(initialSp));
+            if (initialSp != null) {
+                status.getJobStatus().setUpgradeSavepointPath(initialSp);
                 initialUpgradeMode = UpgradeMode.SAVEPOINT;
             }
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index 7d8a15b0..babb05f8 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.api.diff.DiffType;
 import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
-import 
org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;
 import org.apache.flink.kubernetes.operator.api.spec.JobState;
 import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
@@ -48,7 +47,6 @@ import org.apache.flink.util.Preconditions;
 
 import io.javaoperatorsdk.operator.processing.event.ResourceID;
 import lombok.Value;
-import org.apache.commons.lang3.ObjectUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -288,15 +286,7 @@ public abstract class AbstractJobReconciler<
         if (spec.getJob().getUpgradeMode() == UpgradeMode.SAVEPOINT) {
             savepointOpt =
                     Optional.ofNullable(
-                                    ctx.getResource()
-                                            .getStatus()
-                                            .getJobStatus()
-                                            .getUpgradeSnapshotReference())
-                            .map(
-                                    ref ->
-                                            FlinkStateSnapshotUtils
-                                                    
.getValidatedFlinkStateSnapshotPath(
-                                                            
ctx.getKubernetesClient(), ref));
+                            
ctx.getResource().getStatus().getJobStatus().getUpgradeSavepointPath());
             if (savepointOpt.isEmpty()) {
                 savepointOpt =
                         Optional.ofNullable(
@@ -313,28 +303,26 @@ public abstract class AbstractJobReconciler<
     }
 
     /**
-     * Updates the upgrade snapshot reference field in the JobSpec of the 
current Flink resource. If
-     * snapshot resources are enabled, a new FlinkStateSnapshot will be 
created, else it will only
-     * set the path field of the snapshot reference.
+     * Updates the upgrade savepoint field in the JobSpec of the current Flink 
resource and if
+     * snapshot resources are enabled, a new FlinkStateSnapshot will be 
created.
      *
      * @param ctx context
      * @param savepointLocation location of savepoint taken
      */
-    protected void setUpgradeSnapshotReferenceFromSavepoint(
-            FlinkResourceContext<?> ctx, String savepointLocation) {
-        var conf = ObjectUtils.firstNonNull(ctx.getObserveConfig(), new 
Configuration());
+    protected void setUpgradeSavepointPath(FlinkResourceContext<?> ctx, String 
savepointLocation) {
+        var conf = ctx.getObserveConfig();
         var savepointFormatType =
-                
conf.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE);
-
-        var snapshotRef =
-                FlinkStateSnapshotUtils.createReferenceForUpgradeSavepoint(
-                        conf,
-                        ctx.getOperatorConfig(),
-                        ctx.getKubernetesClient(),
-                        ctx.getResource(),
-                        
SavepointFormatType.valueOf(savepointFormatType.name()),
-                        savepointLocation);
-        
ctx.getResource().getStatus().getJobStatus().setUpgradeSnapshotReference(snapshotRef);
+                ctx.getObserveConfig()
+                        
.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE);
+
+        FlinkStateSnapshotUtils.createUpgradeSnapshotResource(
+                conf,
+                ctx.getOperatorConfig(),
+                ctx.getKubernetesClient(),
+                ctx.getResource(),
+                SavepointFormatType.valueOf(savepointFormatType.name()),
+                savepointLocation);
+        
ctx.getResource().getStatus().getJobStatus().setUpgradeSavepointPath(savepointLocation);
     }
 
     /**
@@ -471,11 +459,11 @@ public abstract class AbstractJobReconciler<
         LOG.info("Resubmitting Flink job...");
         SPEC specToRecover = 
ReconciliationUtils.getDeployedSpec(ctx.getResource());
 
-        var upgradeSnapshotRef =
-                
ctx.getResource().getStatus().getJobStatus().getUpgradeSnapshotReference();
+        var upgradeStatePath =
+                
ctx.getResource().getStatus().getJobStatus().getUpgradeSavepointPath();
         var savepointLegacy =
                 
ctx.getResource().getStatus().getJobStatus().getSavepointInfo().getLastSavepoint();
-        var lastSavepointKnown = upgradeSnapshotRef != null || savepointLegacy 
!= null;
+        var lastSavepointKnown = upgradeStatePath != null || savepointLegacy 
!= null;
 
         if (requireHaMetadata) {
             specToRecover.getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
@@ -498,21 +486,9 @@ public abstract class AbstractJobReconciler<
         cancelJob(ctx, UpgradeMode.STATELESS);
         currentDeploySpec.getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
 
-        var snapshotRef = 
currentDeploySpec.getJob().getFlinkStateSnapshotReference();
-        var initialSavepointPath = 
currentDeploySpec.getJob().getInitialSavepointPath();
-
-        if (snapshotRef == null && initialSavepointPath != null) {
-            snapshotRef = 
FlinkStateSnapshotReference.fromPath(initialSavepointPath);
-        }
-
-        Optional<String> savepointPath = Optional.empty();
-        if (snapshotRef != null) {
-            savepointPath =
-                    Optional.of(
-                            
FlinkStateSnapshotUtils.getValidatedFlinkStateSnapshotPath(
-                                    ctx.getKubernetesClient(), snapshotRef));
-            status.getJobStatus().setUpgradeSnapshotReference(snapshotRef);
-        }
+        Optional<String> savepointPath =
+                
Optional.ofNullable(currentDeploySpec.getJob().getInitialSavepointPath());
+        
status.getJobStatus().setUpgradeSavepointPath(savepointPath.orElse(null));
 
         if (desiredJobState == JobState.RUNNING) {
             deploy(
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index 34406b9d..ac6c69e9 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -26,7 +26,6 @@ import org.apache.flink.configuration.PipelineOptionsInternal;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
-import 
org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;
 import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
 import 
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
@@ -167,9 +166,7 @@ public class ApplicationReconciler
             // Last state deployment, explicitly set a dummy savepoint path to 
avoid accidental
             // incorrect state restore in case the HA metadata is deleted by 
the user
             deployConfig.set(SavepointConfigOptions.SAVEPOINT_PATH, 
LAST_STATE_DUMMY_SP_PATH);
-            status.getJobStatus()
-                    .setUpgradeSnapshotReference(
-                            
FlinkStateSnapshotReference.fromPath(LAST_STATE_DUMMY_SP_PATH));
+            
status.getJobStatus().setUpgradeSavepointPath(LAST_STATE_DUMMY_SP_PATH);
         } else {
             // Stateless deployment, remove any user configured savepoint path
             deployConfig.removeConfig(SavepointConfigOptions.SAVEPOINT_PATH);
@@ -242,7 +239,7 @@ public class ApplicationReconciler
         var conf = ObjectUtils.firstNonNull(ctx.getObserveConfig(), new 
Configuration());
         ctx.getFlinkService()
                 .cancelJob(ctx.getResource(), upgradeMode, conf)
-                .ifPresent(location -> 
setUpgradeSnapshotReferenceFromSavepoint(ctx, location));
+                .ifPresent(location -> setUpgradeSavepointPath(ctx, location));
     }
 
     @Override
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
index dcf781c3..3f54a8c7 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
@@ -104,7 +104,7 @@ public class SessionJobReconciler
 
         ctx.getFlinkService()
                 .cancelSessionJob(ctx.getResource(), upgradeMode, conf)
-                .ifPresent(location -> 
setUpgradeSnapshotReferenceFromSavepoint(ctx, location));
+                .ifPresent(location -> setUpgradeSavepointPath(ctx, location));
         ctx.getResource().getStatus().getJobStatus().setJobId(null);
     }
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java
index 018a080b..ecc6c5c9 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java
@@ -41,7 +41,6 @@ import 
io.javaoperatorsdk.operator.processing.event.source.informer.Mappers;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -72,13 +71,10 @@ public class EventSourceUtils {
                                     if (jobRef == null || jobRef.getName() == 
null) {
                                         return Collections.emptySet();
                                     }
-                                    var namespace =
-                                            
Optional.ofNullable(jobRef.getNamespace())
-                                                    
.orElse(snapshot.getMetadata().getNamespace());
                                     return Set.of(
                                             new ResourceID(
                                                     
snapshot.getSpec().getJobReference().getName(),
-                                                    namespace));
+                                                    
snapshot.getMetadata().getNamespace()));
                                 })
                         .withNamespacesInheritedFromController(context)
                         .followNamespaceChanges(true)
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java
index fa542478..5cfbcb1e 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java
@@ -23,7 +23,6 @@ import 
org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.api.CrdConstants;
 import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
 import org.apache.flink.kubernetes.operator.api.spec.CheckpointSpec;
-import 
org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec;
 import org.apache.flink.kubernetes.operator.api.spec.JobReference;
 import org.apache.flink.kubernetes.operator.api.spec.SavepointSpec;
@@ -39,7 +38,6 @@ import io.fabric8.kubernetes.api.model.ObjectMeta;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.javaoperatorsdk.operator.processing.event.ResourceID;
 import org.apache.commons.lang3.ObjectUtils;
-import org.apache.commons.lang3.StringUtils;
 
 import javax.annotation.Nullable;
 
@@ -59,71 +57,6 @@ import static 
org.apache.flink.kubernetes.operator.reconciler.SnapshotType.SAVEP
 /** Utilities class for FlinkStateSnapshot resources. */
 public class FlinkStateSnapshotUtils {
 
-    /**
-     * From a snapshot reference, return its snapshot path. If a {@link 
FlinkStateSnapshot} is
-     * referenced, it will be retrieved from Kubernetes.
-     *
-     * @param kubernetesClient kubernetes client
-     * @param snapshotRef snapshot reference
-     * @return found savepoint path
-     */
-    public static String getValidatedFlinkStateSnapshotPath(
-            KubernetesClient kubernetesClient, FlinkStateSnapshotReference 
snapshotRef) {
-        if (StringUtils.isNotBlank(snapshotRef.getPath())) {
-            return snapshotRef.getPath();
-        }
-
-        if (StringUtils.isBlank(snapshotRef.getName())) {
-            throw new IllegalArgumentException(
-                    String.format("Invalid snapshot name: %s", 
snapshotRef.getName()));
-        }
-
-        var result =
-                snapshotRef.getNamespace() == null
-                        ? kubernetesClient
-                                .resources(FlinkStateSnapshot.class)
-                                .withName(snapshotRef.getName())
-                                .get()
-                        : kubernetesClient
-                                .resources(FlinkStateSnapshot.class)
-                                .inNamespace(snapshotRef.getNamespace())
-                                .withName(snapshotRef.getName())
-                                .get();
-
-        if (result == null) {
-            throw new IllegalStateException(
-                    String.format(
-                            "Cannot find snapshot %s in namespace %s.",
-                            snapshotRef.getNamespace(), 
snapshotRef.getName()));
-        }
-
-        // We can return the savepoint path if it's marked as completed 
without waiting for the
-        // reconciler to update its status.
-        if (result.getSpec().isSavepoint() && 
result.getSpec().getSavepoint().getAlreadyExists()) {
-            var path = result.getSpec().getSavepoint().getPath();
-            if (!StringUtils.isBlank(path)) {
-                return path;
-            }
-        }
-
-        if (COMPLETED != result.getStatus().getState()) {
-            throw new IllegalStateException(
-                    String.format(
-                            "Snapshot %s/%s is not complete yet.",
-                            snapshotRef.getNamespace(), 
snapshotRef.getName()));
-        }
-
-        var path = result.getStatus().getPath();
-        if (StringUtils.isBlank(path)) {
-            throw new IllegalStateException(
-                    String.format(
-                            "Snapshot %s/%s path is incorrect: %s.",
-                            snapshotRef.getNamespace(), snapshotRef.getName(), 
path));
-        }
-
-        return path;
-    }
-
     protected static FlinkStateSnapshot createFlinkStateSnapshot(
             KubernetesClient kubernetesClient,
             String namespace,
@@ -263,9 +196,7 @@ public class FlinkStateSnapshotUtils {
     }
 
     /**
-     * For an upgrade savepoint, create a {@link FlinkStateSnapshot} on the 
Kubernetes cluster and
-     * return its reference if snapshot resources are enabled. In other case 
return a reference
-     * containing only the path.
+     * For an upgrade savepoint, create a {@link FlinkStateSnapshot} on the 
Kubernetes cluster.
      *
      * @param conf job configuration
      * @param operatorConf operator configuration
@@ -273,9 +204,9 @@ public class FlinkStateSnapshotUtils {
      * @param flinkResource referenced Flink resource
      * @param savepointFormatType savepoint format type
      * @param savepointPath path of savepoint
-     * @return reference for snapshot
+     * @return State snapshot resource
      */
-    public static FlinkStateSnapshotReference 
createReferenceForUpgradeSavepoint(
+    public static Optional<FlinkStateSnapshot> createUpgradeSnapshotResource(
             Configuration conf,
             FlinkOperatorConfiguration operatorConf,
             KubernetesClient kubernetesClient,
@@ -283,12 +214,12 @@ public class FlinkStateSnapshotUtils {
             SavepointFormatType savepointFormatType,
             String savepointPath) {
         if (!isSnapshotResourceEnabled(operatorConf, conf)) {
-            return FlinkStateSnapshotReference.fromPath(savepointPath);
+            return Optional.empty();
         }
 
         var disposeOnDelete =
                 
conf.get(KubernetesOperatorConfigOptions.OPERATOR_JOB_SAVEPOINT_DISPOSE_ON_DELETE);
-        var snapshot =
+        var savepointResource =
                 createSavepointResource(
                         kubernetesClient,
                         flinkResource,
@@ -296,7 +227,7 @@ public class FlinkStateSnapshotUtils {
                         SnapshotTriggerType.UPGRADE,
                         savepointFormatType,
                         disposeOnDelete);
-        return FlinkStateSnapshotReference.fromResource(snapshot);
+        return Optional.of(savepointResource);
     }
 
     /**
@@ -439,9 +370,8 @@ public class FlinkStateSnapshotUtils {
      * @return namespace with the job reference to be found in
      */
     public static ResourceID 
getSnapshotJobReferenceResourceId(FlinkStateSnapshot snapshot) {
-        var namespace =
-                
Optional.ofNullable(snapshot.getSpec().getJobReference().getNamespace())
-                        .orElse(snapshot.getMetadata().getNamespace());
-        return new ResourceID(snapshot.getSpec().getJobReference().getName(), 
namespace);
+        return new ResourceID(
+                snapshot.getSpec().getJobReference().getName(),
+                snapshot.getMetadata().getNamespace());
     }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java
index b3d4dbde..f8ce07f4 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
 import org.apache.flink.kubernetes.operator.api.status.JobStatus;
+import org.apache.flink.kubernetes.operator.api.status.Savepoint;
 import org.apache.flink.kubernetes.operator.api.status.SnapshotInfo;
 import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
 import 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
@@ -32,6 +33,7 @@ import 
org.apache.flink.kubernetes.operator.reconciler.SnapshotType;
 import 
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
+import org.apache.commons.lang3.ObjectUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.core.util.CronExpression;
 import org.slf4j.Logger;
@@ -360,25 +362,17 @@ public class SnapshotUtils {
      * @return True if last savepoint is known
      */
     public static boolean lastSavepointKnown(CommonStatus<?> status) {
-        var lastSavepoint = 
status.getJobStatus().getUpgradeSnapshotReference();
-
-        if (lastSavepoint != null) {
-            if (StringUtils.isNotBlank(lastSavepoint.getName())) {
-                return true;
-            }
-
-            var location = lastSavepoint.getPath();
-            return location != null
-                    && 
!location.equals(AbstractJobReconciler.LAST_STATE_DUMMY_SP_PATH);
-        }
-
-        // Check legacy savepoint field too
-        var lastSavepointLegacy = 
status.getJobStatus().getSavepointInfo().getLastSavepoint();
-        if (lastSavepointLegacy == null) {
+        var location =
+                ObjectUtils.firstNonNull(
+                        status.getJobStatus().getUpgradeSavepointPath(),
+                        Optional.ofNullable(
+                                        
status.getJobStatus().getSavepointInfo().getLastSavepoint())
+                                .map(Savepoint::getLocation)
+                                .orElse(null));
+
+        if (location == null) {
             return true;
         }
-        return !lastSavepointLegacy
-                .getLocation()
-                .equals(AbstractJobReconciler.LAST_STATE_DUMMY_SP_PATH);
+        return 
!location.equals(AbstractJobReconciler.LAST_STATE_DUMMY_SP_PATH);
     }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
index 994cce8d..b326e96b 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
@@ -294,12 +294,6 @@ public class DefaultValidator implements 
FlinkResourceValidator {
             return Optional.of("Job parallelism must be larger than 0");
         }
 
-        if (!StringUtils.isNullOrWhitespaceOnly(job.getInitialSavepointPath())
-                && job.getFlinkStateSnapshotReference() != null) {
-            return Optional.of(
-                    "Cannot set both initialSavepointPath and 
flinkStateSnapshotReference in the job spec");
-        }
-
         return Optional.empty();
     }
 
@@ -474,10 +468,9 @@ public class DefaultValidator implements 
FlinkResourceValidator {
             if (newJob.getSavepointRedeployNonce() != null
                     && !newJob.getSavepointRedeployNonce()
                             .equals(oldJob.getSavepointRedeployNonce())) {
-                if 
(StringUtils.isNullOrWhitespaceOnly(newJob.getInitialSavepointPath())
-                        && newJob.getFlinkStateSnapshotReference() == null) {
+                if 
(StringUtils.isNullOrWhitespaceOnly(newJob.getInitialSavepointPath())) {
                     return Optional.of(
-                            "InitialSavepointPath and 
flinkStateSnapshotReference must not be empty for savepoint redeployment");
+                            "InitialSavepointPath must not be empty for 
savepoint redeployment");
                 }
             }
         }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 1bb04216..32e49502 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -25,7 +25,6 @@ import 
org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import 
org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
-import 
org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.api.spec.IngressSpec;
 import org.apache.flink.kubernetes.operator.api.spec.JobState;
@@ -397,9 +396,7 @@ public class FlinkDeploymentControllerTest {
                 new TaskManagerInfo(
                         "component=taskmanager,app=" + 
appCluster.getMetadata().getName(), 1),
                 appCluster.getStatus().getTaskManager());
-        assertEquals(
-                FlinkStateSnapshotReference.fromPath("s0"),
-                
appCluster.getStatus().getJobStatus().getUpgradeSnapshotReference());
+        assertEquals("s0", 
appCluster.getStatus().getJobStatus().getUpgradeSavepointPath());
 
         var previousJobs = new ArrayList<>(jobs);
         appCluster.getSpec().getJob().setInitialSavepointPath("s1");
@@ -407,9 +404,7 @@ public class FlinkDeploymentControllerTest {
         // Send in a no-op change
         testController.reconcile(appCluster, context);
         assertEquals(previousJobs, new ArrayList<>(flinkService.listJobs()));
-        assertEquals(
-                FlinkStateSnapshotReference.fromPath("s0"),
-                
appCluster.getStatus().getJobStatus().getUpgradeSnapshotReference());
+        assertEquals("s0", 
appCluster.getStatus().getJobStatus().getUpgradeSavepointPath());
 
         // Upgrade job
         appCluster.getSpec().getJob().setParallelism(100);
@@ -425,8 +420,7 @@ public class FlinkDeploymentControllerTest {
                         .getState());
         assertEquals(new TaskManagerInfo("", 0), 
appCluster.getStatus().getTaskManager());
         assertEquals(
-                FlinkStateSnapshotReference.fromPath("savepoint_0"),
-                
appCluster.getStatus().getJobStatus().getUpgradeSnapshotReference());
+                "savepoint_0", 
appCluster.getStatus().getJobStatus().getUpgradeSavepointPath());
 
         testController.reconcile(appCluster, context);
         jobs = flinkService.listJobs();
@@ -441,8 +435,7 @@ public class FlinkDeploymentControllerTest {
                 JobManagerDeploymentStatus.READY,
                 appCluster.getStatus().getJobManagerDeploymentStatus());
         assertEquals(
-                FlinkStateSnapshotReference.fromPath("savepoint_1"),
-                
appCluster.getStatus().getJobStatus().getUpgradeSnapshotReference());
+                "savepoint_1", 
appCluster.getStatus().getJobStatus().getUpgradeSavepointPath());
 
         // Resume from last savepoint
         appCluster.getSpec().getJob().setState(JobState.RUNNING);
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
index fb916b66..ba6af301 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
-import 
org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.api.spec.JobState;
 import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
@@ -193,9 +192,7 @@ class FlinkSessionJobControllerTest {
         var jobs = flinkService.listJobs();
         assertEquals(1, jobs.size());
         assertEquals("s0", jobs.get(0).f0);
-        assertEquals(
-                FlinkStateSnapshotReference.fromPath("s0"),
-                
sessionJob.getStatus().getJobStatus().getUpgradeSnapshotReference());
+        assertEquals("s0", 
sessionJob.getStatus().getJobStatus().getUpgradeSavepointPath());
 
         var previousJobs = new ArrayList<>(jobs);
         sessionJob.getSpec().getJob().setInitialSavepointPath("s1");
@@ -203,16 +200,13 @@ class FlinkSessionJobControllerTest {
         // Send in a no-op change
         testController.reconcile(sessionJob, context);
         assertEquals(previousJobs, new ArrayList<>(flinkService.listJobs()));
-        assertEquals(
-                FlinkStateSnapshotReference.fromPath("s0"),
-                
sessionJob.getStatus().getJobStatus().getUpgradeSnapshotReference());
+        assertEquals("s0", 
sessionJob.getStatus().getJobStatus().getUpgradeSavepointPath());
 
         // Upgrade job
         sessionJob.getSpec().getJob().setParallelism(100);
         updateControl = testController.reconcile(sessionJob, context);
         assertEquals(
-                FlinkStateSnapshotReference.fromPath("savepoint_0"),
-                
sessionJob.getStatus().getJobStatus().getUpgradeSnapshotReference());
+                "savepoint_0", 
sessionJob.getStatus().getJobStatus().getUpgradeSavepointPath());
 
         assertEquals(0L, updateControl.getScheduleDelay().get());
         assertEquals(
@@ -232,8 +226,7 @@ class FlinkSessionJobControllerTest {
         assertEquals("savepoint_0", jobs.get(0).f0);
         testController.reconcile(sessionJob, context);
         assertEquals(
-                FlinkStateSnapshotReference.fromPath("savepoint_0"),
-                
sessionJob.getStatus().getJobStatus().getUpgradeSnapshotReference());
+                "savepoint_0", 
sessionJob.getStatus().getJobStatus().getUpgradeSavepointPath());
 
         // Suspend job
         sessionJob.getSpec().getJob().setState(JobState.SUSPENDED);
@@ -247,8 +240,7 @@ class FlinkSessionJobControllerTest {
         assertEquals(1, jobs.size());
         assertEquals("savepoint_1", jobs.get(0).f0);
         assertEquals(
-                FlinkStateSnapshotReference.fromPath("savepoint_1"),
-                
sessionJob.getStatus().getJobStatus().getUpgradeSnapshotReference());
+                "savepoint_1", 
sessionJob.getStatus().getJobStatus().getUpgradeSavepointPath());
 
         testController.reconcile(sessionJob, context);
         testController.cleanup(sessionJob, context);
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java
index 6c4bd57a..81cf121b 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java
@@ -514,10 +514,8 @@ public class FlinkStateSnapshotControllerTest {
         var snapshot = createSavepoint(deployment);
         var errorMessage =
                 String.format(
-                        "Secondary resource %s/%s (%s) for savepoint 
snapshot-test was not found",
-                        deployment.getMetadata().getNamespace(),
-                        deployment.getMetadata().getName(),
-                        CrdConstants.KIND_FLINK_DEPLOYMENT);
+                        "Secondary resource %s (%s) for savepoint 
snapshot-test was not found",
+                        deployment.getMetadata().getName(), 
CrdConstants.KIND_FLINK_DEPLOYMENT);
 
         // First reconcile will trigger the snapshot.
         controller.reconcile(snapshot, TestUtils.createSnapshotContext(client, 
deployment));
@@ -558,10 +556,8 @@ public class FlinkStateSnapshotControllerTest {
         var snapshot = createSavepoint(deployment);
         var errorMessage =
                 String.format(
-                        "Secondary resource %s/%s (%s) for savepoint 
snapshot-test is not running",
-                        deployment.getMetadata().getNamespace(),
-                        deployment.getMetadata().getName(),
-                        CrdConstants.KIND_FLINK_DEPLOYMENT);
+                        "Secondary resource %s (%s) for savepoint 
snapshot-test is not running",
+                        deployment.getMetadata().getName(), 
CrdConstants.KIND_FLINK_DEPLOYMENT);
 
         controller.reconcile(snapshot, context);
 
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
index 7cf1b749..9f79c407 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
@@ -493,9 +493,7 @@ public class ApplicationObserverTest extends 
OperatorTestBase {
         assertEquals(
                 org.apache.flink.api.common.JobStatus.FAILED.name(),
                 deployment.getStatus().getJobStatus().getState());
-        assertEquals(
-                "last-SP",
-                
deployment.getStatus().getJobStatus().getUpgradeSnapshotReference().getPath());
+        assertEquals("last-SP", 
deployment.getStatus().getJobStatus().getUpgradeSavepointPath());
         
assertFalse(SnapshotUtils.savepointInProgress(deployment.getStatus().getJobStatus()));
 
         observer.observe(deployment, readyContext);
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 51f90fb8..6a5f60ad 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -40,7 +40,6 @@ import org.apache.flink.kubernetes.operator.api.CrdConstants;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
-import 
org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.api.spec.JobReference;
 import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
@@ -174,7 +173,7 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
 
         // clean up
         
assertNull(deployment.getStatus().getJobStatus().getSavepointInfo().getLastSavepoint());
-        
assertNull(deployment.getStatus().getJobStatus().getUpgradeSnapshotReference());
+        
assertNull(deployment.getStatus().getJobStatus().getUpgradeSavepointPath());
         reconciler.cleanup(
                 deployment, 
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
 
@@ -185,11 +184,11 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
                             
assertThat(snapshot.getSpec().getSavepoint().getPath())
                                     .isEqualTo("savepoint_0");
                             assertEquals(
-                                    
FlinkStateSnapshotReference.fromResource(snapshot),
+                                    "savepoint_0",
                                     deployment
                                             .getStatus()
                                             .getJobStatus()
-                                            .getUpgradeSnapshotReference());
+                                            .getUpgradeSavepointPath());
                         });
     }
 
@@ -209,7 +208,7 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
         verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
 
         // clean up
-        
assertNull(deployment.getStatus().getJobStatus().getUpgradeSnapshotReference());
+        
assertNull(deployment.getStatus().getJobStatus().getUpgradeSavepointPath());
         reconciler.cleanup(
                 deployment, 
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
 
@@ -220,11 +219,11 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
                             
assertThat(snapshot.getSpec().getSavepoint().getPath())
                                     .isEqualTo("savepoint_0");
                             assertEquals(
-                                    
FlinkStateSnapshotReference.fromResource(snapshot),
+                                    "savepoint_0",
                                     deployment
                                             .getStatus()
                                             .getJobStatus()
-                                            .getUpgradeSnapshotReference());
+                                            .getUpgradeSavepointPath());
                         });
     }
 
@@ -554,7 +553,7 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
         verifyAndSetRunningJobsToStatus(deployment, runningJobs);
         assertFalse(isSnapshotInProgress.test(getJobStatus(deployment)));
         assertNull(getSnapshotInfo.apply(deployment).getLastSnapshot());
-        
assertNull(deployment.getStatus().getJobStatus().getUpgradeSnapshotReference());
+        
assertNull(deployment.getStatus().getJobStatus().getUpgradeSavepointPath());
         assertNull(getLastSnapshotStatus(deployment, snapshotType));
 
         FlinkDeployment snDeployment = ReconciliationUtils.clone(deployment);
@@ -563,7 +562,7 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
         reconciler.reconcile(snDeployment, context);
         assertFalse(isSnapshotInProgress.test((getJobStatus(snDeployment))));
         assertNull(getSnapshotInfo.apply(deployment).getLastSnapshot());
-        
assertNull(deployment.getStatus().getJobStatus().getUpgradeSnapshotReference());
+        
assertNull(deployment.getStatus().getJobStatus().getUpgradeSavepointPath());
         assertNull(getLastSnapshotStatus(snDeployment, snapshotType));
 
         // trigger when nonce is defined
@@ -1199,11 +1198,11 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
                             
assertThat(snapshot.getSpec().getSavepoint().getPath())
                                     .isEqualTo("savepoint_0");
                             assertEquals(
-                                    
FlinkStateSnapshotReference.fromResource(snapshot),
+                                    "savepoint_0",
                                     deployment
                                             .getStatus()
                                             .getJobStatus()
-                                            .getUpgradeSnapshotReference());
+                                            .getUpgradeSavepointPath());
                         });
     }
 
@@ -1376,7 +1375,7 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
                 status.getJobManagerDeploymentStatus());
 
         // Verify that savepoint and upgrade mode is recorded correctly in 
reconciled spec
-        assertEquals(savepoint, 
status.getJobStatus().getUpgradeSnapshotReference().getPath());
+        assertEquals(savepoint, 
status.getJobStatus().getUpgradeSavepointPath());
         assertEquals(
                 UpgradeMode.SAVEPOINT,
                 status.getReconciliationStatus()
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
index e4117659..465c3cf6 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
@@ -29,7 +29,6 @@ import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.api.CrdConstants;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
-import 
org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.api.spec.JobState;
 import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
@@ -45,7 +44,6 @@ import 
org.apache.flink.kubernetes.operator.exception.RecoveryFailureException;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.reconciler.TestReconcilerAdapter;
 import org.apache.flink.kubernetes.operator.service.CheckpointHistoryWrapper;
-import org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
@@ -178,38 +176,25 @@ public class ApplicationReconcilerUpgradeModeTest extends 
OperatorTestBase {
                 
snapshots.get(0).getMetadata().getLabels().get(CrdConstants.LABEL_SNAPSHOT_TYPE));
     }
 
-    private static Stream<Arguments> flinkVersionsWithLegacyMode() {
-        return TestUtils.flinkVersions()
-                .flatMap(
-                        arg ->
-                                Stream.of(
-                                        Arguments.of(arg.get()[0], true),
-                                        Arguments.of(arg.get()[0], false)));
-    }
-
     @ParameterizedTest
-    @MethodSource("flinkVersionsWithLegacyMode")
-    public void testUpgradeFromStatelessToLastState(
-            FlinkVersion flinkVersion, boolean legacySnapshots) throws 
Exception {
-        testUpgradeToLastState(flinkVersion, UpgradeMode.STATELESS, 
legacySnapshots);
+    
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
+    public void testUpgradeFromStatelessToLastState(FlinkVersion flinkVersion) 
throws Exception {
+        testUpgradeToLastState(flinkVersion, UpgradeMode.STATELESS);
     }
 
     @ParameterizedTest
-    @MethodSource("flinkVersionsWithLegacyMode")
-    public void testUpgradeFromSavepointToLastState(
-            FlinkVersion flinkVersion, boolean legacySnapshots) throws 
Exception {
-        testUpgradeToLastState(flinkVersion, UpgradeMode.SAVEPOINT, 
legacySnapshots);
+    
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
+    public void testUpgradeFromSavepointToLastState(FlinkVersion flinkVersion) 
throws Exception {
+        testUpgradeToLastState(flinkVersion, UpgradeMode.SAVEPOINT);
     }
 
     @ParameterizedTest
-    @MethodSource("flinkVersionsWithLegacyMode")
-    public void testUpgradeFromLastStateToLastState(
-            FlinkVersion flinkVersion, boolean legacySnapshots) throws 
Exception {
-        testUpgradeToLastState(flinkVersion, UpgradeMode.LAST_STATE, 
legacySnapshots);
+    
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
+    public void testUpgradeFromLastStateToLastState(FlinkVersion flinkVersion) 
throws Exception {
+        testUpgradeToLastState(flinkVersion, UpgradeMode.LAST_STATE);
     }
 
-    private void testUpgradeToLastState(
-            FlinkVersion flinkVersion, UpgradeMode fromUpgradeMode, boolean 
legacySnapshots)
+    private void testUpgradeToLastState(FlinkVersion flinkVersion, UpgradeMode 
fromUpgradeMode)
             throws Exception {
         FlinkDeployment deployment = buildApplicationCluster(flinkVersion, 
fromUpgradeMode);
 
@@ -250,27 +235,7 @@ public class ApplicationReconcilerUpgradeModeTest extends 
OperatorTestBase {
         deployment.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
         deployment.getSpec().setRestartNonce(200L);
         flinkService.setHaDataAvailable(false);
-        if (legacySnapshots) {
-            deployment
-                    .getStatus()
-                    .getJobStatus()
-                    .setUpgradeSnapshotReference(
-                            
FlinkStateSnapshotReference.fromPath("finished_sp"));
-        } else {
-            var snapshot =
-                    FlinkStateSnapshotUtils.createSavepointResource(
-                            kubernetesClient,
-                            deployment,
-                            "finished_sp",
-                            SnapshotTriggerType.UPGRADE,
-                            SavepointFormatType.CANONICAL,
-                            false);
-            deployment
-                    .getStatus()
-                    .getJobStatus()
-                    .setUpgradeSnapshotReference(
-                            
FlinkStateSnapshotReference.fromResource(snapshot));
-        }
+        
deployment.getStatus().getJobStatus().setUpgradeSavepointPath("finished_sp");
         deployment.getStatus().getJobStatus().setState("FINISHED");
         
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
         deployment
@@ -308,11 +273,7 @@ public class ApplicationReconcilerUpgradeModeTest extends 
OperatorTestBase {
                                     SavepointFormatType.CANONICAL,
                                     0L));
         } else {
-            deployment
-                    .getStatus()
-                    .getJobStatus()
-                    .setUpgradeSnapshotReference(
-                            
FlinkStateSnapshotReference.fromPath(savepointPath));
+            
deployment.getStatus().getJobStatus().setUpgradeSavepointPath(savepointPath);
             deployment
                     .getStatus()
                     .getJobStatus()
@@ -461,8 +422,7 @@ public class ApplicationReconcilerUpgradeModeTest extends 
OperatorTestBase {
         if (initSavepoint) {
             assertEquals("init-sp", flinkService.listJobs().get(0).f0);
             assertEquals(
-                    "init-sp",
-                    
deployment.getStatus().getJobStatus().getUpgradeSnapshotReference().getPath());
+                    "init-sp", 
deployment.getStatus().getJobStatus().getUpgradeSavepointPath());
             assertEquals(UpgradeMode.SAVEPOINT, 
lastReconciledSpec.getJob().getUpgradeMode());
         } else {
             assertNull(flinkService.listJobs().get(0).f0);
@@ -676,12 +636,12 @@ public class ApplicationReconcilerUpgradeModeTest extends 
OperatorTestBase {
         if (checkpointAvailable) {
             assertEquals(
                     ApplicationReconciler.LAST_STATE_DUMMY_SP_PATH,
-                    
deployment.getStatus().getJobStatus().getUpgradeSnapshotReference().getPath());
+                    
deployment.getStatus().getJobStatus().getUpgradeSavepointPath());
             assertEquals(
                     ApplicationReconciler.LAST_STATE_DUMMY_SP_PATH,
                     flinkService.listJobs().get(0).f0);
         } else {
-            
assertNull(deployment.getStatus().getJobStatus().getUpgradeSnapshotReference());
+            
assertNull(deployment.getStatus().getJobStatus().getUpgradeSavepointPath());
             assertNull(flinkService.listJobs().get(0).f0);
         }
     }
@@ -798,7 +758,7 @@ public class ApplicationReconcilerUpgradeModeTest extends 
OperatorTestBase {
         // trigger a savepoint
         assertEquals(
                 ApplicationReconciler.LAST_STATE_DUMMY_SP_PATH,
-                
deployment.getStatus().getJobStatus().getUpgradeSnapshotReference().getPath());
+                
deployment.getStatus().getJobStatus().getUpgradeSavepointPath());
         assertEquals(
                 ApplicationReconciler.LAST_STATE_DUMMY_SP_PATH, 
flinkService.listJobs().get(0).f0);
     }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
index 88b4cf5b..6ab77705 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.api.CrdConstants;
 import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
-import 
org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;
 import org.apache.flink.kubernetes.operator.api.spec.JobState;
 import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
@@ -130,8 +129,7 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
 
         if (legacySnapshots) {
             assertEquals(
-                    "savepoint_0",
-                    
sessionJob.getStatus().getJobStatus().getUpgradeSnapshotReference().getPath());
+                    "savepoint_0", 
sessionJob.getStatus().getJobStatus().getUpgradeSavepointPath());
         } else {
             var snapshots =
                     TestUtils.getFlinkStateSnapshotsForResource(
@@ -139,8 +137,8 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
             assertThat(snapshots).isNotEmpty();
             assertEquals("savepoint_0", 
snapshots.get(0).getSpec().getSavepoint().getPath());
             assertEquals(
-                    FlinkStateSnapshotReference.fromResource(snapshots.get(0)),
-                    
sessionJob.getStatus().getJobStatus().getUpgradeSnapshotReference());
+                    snapshots.get(0).getSpec().getSavepoint().getPath(),
+                    
sessionJob.getStatus().getJobStatus().getUpgradeSavepointPath());
         }
     }
 
@@ -173,8 +171,7 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
                 sessionJob, 
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
         if (legacySnapshots) {
             assertEquals(
-                    "savepoint_0",
-                    
sessionJob.getStatus().getJobStatus().getUpgradeSnapshotReference().getPath());
+                    "savepoint_0", 
sessionJob.getStatus().getJobStatus().getUpgradeSavepointPath());
         } else {
             var snapshots =
                     TestUtils.getFlinkStateSnapshotsForResource(
@@ -182,8 +179,7 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
             assertThat(snapshots).isNotEmpty();
             assertEquals("savepoint_0", 
snapshots.get(0).getSpec().getSavepoint().getPath());
             assertEquals(
-                    FlinkStateSnapshotReference.fromResource(snapshots.get(0)),
-                    
sessionJob.getStatus().getJobStatus().getUpgradeSnapshotReference());
+                    "savepoint_0", 
sessionJob.getStatus().getJobStatus().getUpgradeSavepointPath());
         }
     }
 
@@ -412,11 +408,7 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
         if (legacySnapshots) {
             assertEquals(
                     "savepoint_0",
-                    statefulSessionJob
-                            .getStatus()
-                            .getJobStatus()
-                            .getUpgradeSnapshotReference()
-                            .getPath());
+                    
statefulSessionJob.getStatus().getJobStatus().getUpgradeSavepointPath());
         } else {
             var snapshots =
                     TestUtils.getFlinkStateSnapshotsForResource(
@@ -430,8 +422,8 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
                             .getLabels()
                             .get(CrdConstants.LABEL_SNAPSHOT_TYPE));
             assertEquals(
-                    FlinkStateSnapshotReference.fromResource(snapshots.get(0)),
-                    
statefulSessionJob.getStatus().getJobStatus().getUpgradeSnapshotReference());
+                    snapshots.get(0).getSpec().getSavepoint().getPath(),
+                    
statefulSessionJob.getStatus().getJobStatus().getUpgradeSavepointPath());
         }
 
         flinkService.clear();
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
index f9b4781c..3cfa562b 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
@@ -302,7 +302,7 @@ public class AbstractFlinkServiceTest {
         assertTrue(cancelFuture.isDone());
         assertEquals(jobID, cancelFuture.get());
         assertNull(jobStatus.getSavepointInfo().getLastSavepoint());
-        assertNull(jobStatus.getUpgradeSnapshotReference());
+        assertNull(jobStatus.getUpgradeSavepointPath());
     }
 
     @ParameterizedTest
@@ -541,7 +541,7 @@ public class AbstractFlinkServiceTest {
                 configManager.getObserveConfig(deployment),
                 false);
         assertNull(jobStatus.getSavepointInfo().getLastSavepoint());
-        assertNull(jobStatus.getUpgradeSnapshotReference());
+        assertNull(jobStatus.getUpgradeSavepointPath());
     }
 
     @Test
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java
index fd136969..1a3dc8e6 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.api.CrdConstants;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
-import 
org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.api.spec.JobKind;
 import org.apache.flink.kubernetes.operator.api.spec.JobReference;
@@ -31,7 +30,6 @@ import 
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus;
 import 
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.api.status.SavepointFormatType;
 import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
-import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
@@ -47,14 +45,10 @@ import java.util.stream.Collectors;
 import static org.apache.flink.kubernetes.operator.TestUtils.reconcileSpec;
 import static 
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.COMPLETED;
 import static 
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.IN_PROGRESS;
-import static 
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.TRIGGER_PENDING;
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_JOB_SAVEPOINT_DISPOSE_ON_DELETE;
-import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.SNAPSHOT_RESOURCE_ENABLED;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Tests for {@link FlinkStateSnapshotUtils}. */
@@ -62,8 +56,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class FlinkStateSnapshotUtilsTest {
 
     private KubernetesClient client;
-
-    private final FlinkConfigManager configManager = new 
FlinkConfigManager(new Configuration());
     private static final String NAMESPACE = "test";
     private static final String SAVEPOINT_NAME = "savepoint-01";
     private static final String SAVEPOINT_PATH = "/tmp/savepoint-01";
@@ -98,87 +90,6 @@ public class FlinkStateSnapshotUtilsTest {
                 .isEqualTo(SnapshotTriggerType.PERIODIC);
     }
 
-    @Test
-    public void testGetValidatedFlinkStateSnapshotPathPathGiven() {
-        var snapshotRef = 
FlinkStateSnapshotReference.builder().path(SAVEPOINT_PATH).build();
-        var snapshotResult =
-                
FlinkStateSnapshotUtils.getValidatedFlinkStateSnapshotPath(client, snapshotRef);
-        assertEquals(SAVEPOINT_PATH, snapshotResult);
-    }
-
-    @Test
-    public void testGetValidatedFlinkStateSnapshotPathFoundResource() {
-        var snapshot = initSavepoint(COMPLETED, null);
-        client.resource(snapshot).create();
-
-        var snapshotRef =
-                FlinkStateSnapshotReference.builder()
-                        .namespace(NAMESPACE)
-                        .name(SAVEPOINT_NAME)
-                        .build();
-        var snapshotResult =
-                
FlinkStateSnapshotUtils.getValidatedFlinkStateSnapshotPath(client, snapshotRef);
-        assertEquals(SAVEPOINT_PATH, snapshotResult);
-    }
-
-    @Test
-    public void testGetValidatedFlinkStateSnapshotPathInvalidName() {
-        var snapshotRef =
-                
FlinkStateSnapshotReference.builder().namespace(NAMESPACE).name("  ").build();
-        assertThrows(
-                IllegalArgumentException.class,
-                () ->
-                        
FlinkStateSnapshotUtils.getValidatedFlinkStateSnapshotPath(
-                                client, snapshotRef));
-    }
-
-    @Test
-    public void testGetValidatedFlinkStateSnapshotPathNotFound() {
-        var snapshotRef =
-                FlinkStateSnapshotReference.builder()
-                        .namespace("not-exists")
-                        .name("not-exists")
-                        .build();
-        assertThrows(
-                IllegalStateException.class,
-                () ->
-                        
FlinkStateSnapshotUtils.getValidatedFlinkStateSnapshotPath(
-                                client, snapshotRef));
-    }
-
-    @Test
-    public void testGetAndValidateFlinkStateSnapshotAlreadyExists() {
-        var snapshot = initSavepoint(TRIGGER_PENDING, null);
-        snapshot.getSpec().getSavepoint().setAlreadyExists(true);
-        client.resource(snapshot).create();
-
-        var snapshotRef =
-                FlinkStateSnapshotReference.builder()
-                        .namespace(NAMESPACE)
-                        .name(SAVEPOINT_NAME)
-                        .build();
-        var snapshotResult =
-                
FlinkStateSnapshotUtils.getValidatedFlinkStateSnapshotPath(client, snapshotRef);
-        assertEquals(SAVEPOINT_PATH, snapshotResult);
-    }
-
-    @Test
-    public void testGetValidatedFlinkStateSnapshotPathNotCompleted() {
-        var snapshot = initSavepoint(IN_PROGRESS, null);
-        client.resource(snapshot).create();
-
-        var snapshotRef =
-                FlinkStateSnapshotReference.builder()
-                        .namespace(NAMESPACE)
-                        .name(SAVEPOINT_NAME)
-                        .build();
-        assertThrows(
-                IllegalStateException.class,
-                () ->
-                        
FlinkStateSnapshotUtils.getValidatedFlinkStateSnapshotPath(
-                                client, snapshotRef));
-    }
-
     @Test
     public void testGetFlinkStateSnapshotsForResource() {
         var deployment = initDeployment();
@@ -298,45 +209,25 @@ public class FlinkStateSnapshotUtilsTest {
         var conf = new Configuration();
         conf.set(OPERATOR_JOB_SAVEPOINT_DISPOSE_ON_DELETE, disposeOnDelete);
         var operatorConf = FlinkOperatorConfiguration.fromConfiguration(conf);
-        var result =
-                FlinkStateSnapshotUtils.createReferenceForUpgradeSavepoint(
-                        conf,
-                        operatorConf,
-                        client,
-                        deployment,
-                        SavepointFormatType.CANONICAL,
-                        SAVEPOINT_PATH);
+        FlinkStateSnapshotUtils.createUpgradeSnapshotResource(
+                conf,
+                operatorConf,
+                client,
+                deployment,
+                SavepointFormatType.CANONICAL,
+                SAVEPOINT_PATH);
         var snapshots = TestUtils.getFlinkStateSnapshotsForResource(client, 
deployment);
         assertThat(snapshots)
                 .hasSize(1)
                 .allSatisfy(
                         snapshot -> {
-                            assertEquals(snapshot.getMetadata().getName(), 
result.getName());
-                            assertEquals(
-                                    snapshot.getMetadata().getNamespace(), 
result.getNamespace());
                             assertEquals(
                                     disposeOnDelete,
                                     
snapshot.getSpec().getSavepoint().getDisposeOnDelete());
+                            assertEquals(
+                                    SAVEPOINT_PATH, 
snapshot.getSpec().getSavepoint().getPath());
+                            
assertTrue(snapshot.getSpec().getSavepoint().getAlreadyExists());
                         });
-        assertNull(result.getPath());
-    }
-
-    @Test
-    public void testCreateReferenceForUpgradeSavepointWithPath() {
-        var deployment = initDeployment();
-        var conf = new Configuration().set(SNAPSHOT_RESOURCE_ENABLED, false);
-        var operatorConf = FlinkOperatorConfiguration.fromConfiguration(conf);
-        var result =
-                FlinkStateSnapshotUtils.createReferenceForUpgradeSavepoint(
-                        conf,
-                        operatorConf,
-                        client,
-                        deployment,
-                        SavepointFormatType.CANONICAL,
-                        SAVEPOINT_PATH);
-        assertEquals(SAVEPOINT_PATH, result.getPath());
-        assertNull(result.getNamespace());
-        assertNull(result.getName());
     }
 
     @Test
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.java
index 23720df7..37f05309 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
-import 
org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
 import 
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
@@ -319,20 +318,12 @@ public class SnapshotUtilsTest {
         sp.setLocation(AbstractJobReconciler.LAST_STATE_DUMMY_SP_PATH);
         assertFalse(SnapshotUtils.lastSavepointKnown(status));
 
-        status.getJobStatus()
-                
.setUpgradeSnapshotReference(FlinkStateSnapshotReference.fromPath("sp1"));
+        status.getJobStatus().setUpgradeSavepointPath("sp1");
         assertTrue(SnapshotUtils.lastSavepointKnown(status));
 
         status.getJobStatus()
-                .setUpgradeSnapshotReference(
-                        FlinkStateSnapshotReference.fromPath(
-                                
AbstractJobReconciler.LAST_STATE_DUMMY_SP_PATH));
+                
.setUpgradeSavepointPath(AbstractJobReconciler.LAST_STATE_DUMMY_SP_PATH);
         assertFalse(SnapshotUtils.lastSavepointKnown(status));
-
-        status.getJobStatus()
-                .setUpgradeSnapshotReference(
-                        new FlinkStateSnapshotReference("namespace", "name", 
null));
-        assertTrue(SnapshotUtils.lastSavepointKnown(status));
     }
 
     private static void resetTrigger(FlinkDeployment deployment, SnapshotType 
snapshotType) {
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
index 69ce55ed..f5d0712f 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
@@ -31,7 +31,6 @@ import 
org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
 import org.apache.flink.kubernetes.operator.api.spec.CheckpointSpec;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
-import 
org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.api.spec.IngressSpec;
 import org.apache.flink.kubernetes.operator.api.spec.JobKind;
@@ -527,16 +526,6 @@ public class DefaultValidatorTest {
                     
dep.getSpec().getTaskManager().getResource().setEphemeralStorage("abc");
                 },
                 "TaskManager resource ephemeral storage parse error: Character 
a is neither a decimal digit number, decimal point, nor \"e\" notation 
exponential mark.");
-
-        testError(
-                dep -> {
-                    dep.getSpec()
-                            .getJob()
-                            .setFlinkStateSnapshotReference(
-                                    
FlinkStateSnapshotReference.builder().name("snapshot").build());
-                    dep.getSpec().getJob().setInitialSavepointPath("s0");
-                },
-                "Cannot set both initialSavepointPath and 
flinkStateSnapshotReference in the job spec");
     }
 
     @Test
@@ -615,9 +604,8 @@ public class DefaultValidatorTest {
                             .serializeAndSetLastReconciledSpec(dep.getSpec(), 
dep);
                     job.setSavepointRedeployNonce(1L);
                     job.setInitialSavepointPath(null);
-                    job.setFlinkStateSnapshotReference(null);
                 },
-                "InitialSavepointPath and flinkStateSnapshotReference must not 
be empty for savepoint redeployment");
+                "InitialSavepointPath must not be empty for savepoint 
redeployment");
 
         testError(
                 dep -> {
@@ -628,7 +616,7 @@ public class DefaultValidatorTest {
                     job.setSavepointRedeployNonce(1L);
                     job.setInitialSavepointPath(" ");
                 },
-                "InitialSavepointPath and flinkStateSnapshotReference must not 
be empty for savepoint redeploymen");
+                "InitialSavepointPath must not be empty for savepoint 
redeploymen");
 
         testError(
                 dep -> {
@@ -642,7 +630,7 @@ public class DefaultValidatorTest {
                     job.setSavepointRedeployNonce(2L);
                     job.setInitialSavepointPath(null);
                 },
-                "InitialSavepointPath and flinkStateSnapshotReference must not 
be empty for savepoint redeploymen");
+                "InitialSavepointPath must not be empty for savepoint 
redeploymen");
     }
 
     @ParameterizedTest
@@ -1072,19 +1060,17 @@ public class DefaultValidatorTest {
                 null);
 
         var refName = "does-not-exist";
-        var namespace = "default";
         var snapshot =
                 TestUtils.buildFlinkStateSnapshotSavepoint(
                         false,
                         JobReference.builder()
                                 .kind(JobKind.FLINK_DEPLOYMENT)
                                 .name(refName)
-                                .namespace(namespace)
                                 .build());
         testStateSnapshotValidate(
                 snapshot,
                 Optional.empty(),
-                String.format("Target for snapshot %s/%s was not found", 
namespace, refName));
+                String.format("Target for snapshot test/%s was not found", 
refName));
     }
 
     private void testStateSnapshotValidateWithModifier(
diff --git 
a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml 
b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
index 6aabf726..b4c2d4c0 100644
--- 
a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
+++ 
b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
@@ -88,15 +88,6 @@ spec:
                     type: integer
                   entryClass:
                     type: string
-                  flinkStateSnapshotReference:
-                    properties:
-                      name:
-                        type: string
-                      namespace:
-                        type: string
-                      path:
-                        type: string
-                    type: object
                   initialSavepointPath:
                     type: string
                   jarURI:
@@ -10367,15 +10358,8 @@ spec:
                     type: string
                   updateTime:
                     type: string
-                  upgradeSnapshotReference:
-                    properties:
-                      name:
-                        type: string
-                      namespace:
-                        type: string
-                      path:
-                        type: string
-                    type: object
+                  upgradeSavepointPath:
+                    type: string
                 type: object
               lifecycleState:
                 enum:
diff --git 
a/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml 
b/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
index 0df1aa87..b7ea567d 100644
--- 
a/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
+++ 
b/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
@@ -49,15 +49,6 @@ spec:
                     type: integer
                   entryClass:
                     type: string
-                  flinkStateSnapshotReference:
-                    properties:
-                      name:
-                        type: string
-                      namespace:
-                        type: string
-                      path:
-                        type: string
-                    type: object
                   initialSavepointPath:
                     type: string
                   jarURI:
@@ -209,15 +200,8 @@ spec:
                     type: string
                   updateTime:
                     type: string
-                  upgradeSnapshotReference:
-                    properties:
-                      name:
-                        type: string
-                      namespace:
-                        type: string
-                      path:
-                        type: string
-                    type: object
+                  upgradeSavepointPath:
+                    type: string
                 type: object
               lifecycleState:
                 enum:
diff --git 
a/helm/flink-kubernetes-operator/crds/flinkstatesnapshots.flink.apache.org-v1.yml
 
b/helm/flink-kubernetes-operator/crds/flinkstatesnapshots.flink.apache.org-v1.yml
index 96ba2ca1..3f2f3475 100644
--- 
a/helm/flink-kubernetes-operator/crds/flinkstatesnapshots.flink.apache.org-v1.yml
+++ 
b/helm/flink-kubernetes-operator/crds/flinkstatesnapshots.flink.apache.org-v1.yml
@@ -48,8 +48,6 @@ spec:
                     type: string
                   name:
                     type: string
-                  namespace:
-                    type: string
                 type: object
               savepoint:
                 properties:

Reply via email to