This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new d9282263 [FLINK-36162] Remove flinkStateSnapshotReference and
namespace from FlinkStateSnapshot jobReference
d9282263 is described below
commit d9282263baad879a6a00f9614d985a0e1a47740c
Author: Gyula Fora <[email protected]>
AuthorDate: Wed Aug 28 11:22:30 2024 +0200
[FLINK-36162] Remove flinkStateSnapshotReference and namespace from
FlinkStateSnapshot jobReference
---
docs/content/docs/concepts/controller-flow.md | 2 +-
.../content/docs/custom-resource/job-management.md | 9 +-
docs/content/docs/custom-resource/reference.md | 17 +--
docs/content/docs/custom-resource/snapshots.md | 10 +-
docs/content/docs/operations/upgrade.md | 7 +-
e2e-tests/test_snapshot.sh | 10 +-
examples/snapshot/job-from-savepoint.yaml | 4 +-
.../api/spec/FlinkStateSnapshotReference.java | 58 ---------
.../kubernetes/operator/api/spec/JobReference.java | 9 +-
.../kubernetes/operator/api/spec/JobSpec.java | 13 +--
.../kubernetes/operator/api/status/JobStatus.java | 3 +-
.../operator/observer/SnapshotObserver.java | 6 +-
.../AbstractFlinkResourceReconciler.java | 24 +---
.../deployment/AbstractJobReconciler.java | 68 ++++-------
.../deployment/ApplicationReconciler.java | 7 +-
.../sessionjob/SessionJobReconciler.java | 2 +-
.../operator/utils/EventSourceUtils.java | 6 +-
.../operator/utils/FlinkStateSnapshotUtils.java | 88 ++------------
.../kubernetes/operator/utils/SnapshotUtils.java | 30 ++---
.../operator/validation/DefaultValidator.java | 11 +-
.../controller/FlinkDeploymentControllerTest.java | 15 +--
.../controller/FlinkSessionJobControllerTest.java | 18 +--
.../FlinkStateSnapshotControllerTest.java | 12 +-
.../deployment/ApplicationObserverTest.java | 4 +-
.../deployment/ApplicationReconcilerTest.java | 23 ++--
.../ApplicationReconcilerUpgradeModeTest.java | 72 +++---------
.../sessionjob/SessionJobReconcilerTest.java | 24 ++--
.../operator/service/AbstractFlinkServiceTest.java | 4 +-
.../utils/FlinkStateSnapshotUtilsTest.java | 129 ++-------------------
.../operator/utils/SnapshotUtilsTest.java | 13 +--
.../operator/validation/DefaultValidatorTest.java | 22 +---
.../crds/flinkdeployments.flink.apache.org-v1.yml | 20 +---
.../crds/flinksessionjobs.flink.apache.org-v1.yml | 20 +---
.../flinkstatesnapshots.flink.apache.org-v1.yml | 2 -
34 files changed, 149 insertions(+), 613 deletions(-)
diff --git a/docs/content/docs/concepts/controller-flow.md
b/docs/content/docs/concepts/controller-flow.md
index 8e8d84c0..b6d876ba 100644
--- a/docs/content/docs/concepts/controller-flow.md
+++ b/docs/content/docs/concepts/controller-flow.md
@@ -98,7 +98,7 @@ It’s very important to understand that the Observer phase
records a point-in-t
The `AbstractFlinkResourceReconciler` encapsulates the core reconciliation
flow for all Flink resource types. Let’s take a look at the high level flow
before we go into specifics for session, application and session job resources.
1. Check if the resource is ready for reconciliation or if there are any
pending operations that should not be interrupted (manual savepoints for
example)
-2. If this is the first deployment attempt for the resource, we simply deploy
it. It’s important to note here that this is the only deploy operation where we
use the `flinkStateSnapshotReference` provided in the spec.
+2. If this is the first deployment attempt for the resource, we simply deploy
it. It’s important to note here that this is the only deploy operation where we
use the `initialSavepointPath` provided in the spec.
3. Next we determine if the desired spec changed and the type of change:
`IGNORE, SCALE, UPGRADE`. Only for scale and upgrade type changes do we need to
execute further reconciliation logic.
4. If we have upgrade/scale spec changes we execute the upgrade logic specific
for the resource type
5. If we did not receive any spec change we still have to ensure that the
currently deployed resources are fully reconciled:
diff --git a/docs/content/docs/custom-resource/job-management.md
b/docs/content/docs/custom-resource/job-management.md
index 790c825a..231f92a8 100644
--- a/docs/content/docs/custom-resource/job-management.md
+++ b/docs/content/docs/custom-resource/job-management.md
@@ -247,17 +247,16 @@ Users have two options to restore a job from a target
savepoint / checkpoint
### Redeploy using the savepointRedeployNonce
-It is possible to redeploy a `FlinkDeployment` or `FlinkSessionJob` resource
from a target savepoint by using the combination of `savepointRedeployNonce`
and `flinkStateSnapshotReference` in the job spec:
+It is possible to redeploy a `FlinkDeployment` or `FlinkSessionJob` resource
from a target savepoint by using the combination of `savepointRedeployNonce`
and `initialSavepointPath` in the job spec:
```yaml
job:
- flinkStateSnapshotReference:
- path: file://redeploy-target-savepoint
+ initialSavepointPath: file://redeploy-target-savepoint
# If not set previously, set to 1, otherwise increment, e.g. 2
savepointRedeployNonce: 1
```
-When changing the `savepointRedeployNonce` the operator will redeploy the job
to the savepoint defined in the `flinkStateSnapshotReference`. The savepoint
path must not be empty.
+When changing the `savepointRedeployNonce` the operator will redeploy the job
to the savepoint defined in the `initialSavepointPath`. The savepoint path must
not be empty.
{{< hint warning >}}
Rollbacks are not supported after redeployments.
@@ -271,7 +270,7 @@ However, this also means that savepoint history is lost and
the operator won't c
1. Locate the latest checkpoint/savepoint metafile in your configured
checkpoint/savepoint directory.
2. Delete the `FlinkDeployment` resource for your application
3. Check that you have the current savepoint, and that your `FlinkDeployment`
is deleted completely
- 4. Modify your `FlinkDeployment` JobSpec and set
`flinkStateSnapshotReference.path` to your last checkpoint location
+ 4. Modify your `FlinkDeployment` JobSpec and set `initialSavepointPath` to
your last checkpoint location
5. Recreate the deployment
These steps ensure that the operator will start completely fresh from the user
defined savepoint path and can hopefully fully recover.
diff --git a/docs/content/docs/custom-resource/reference.md
b/docs/content/docs/custom-resource/reference.md
index 3348f46e..12e1d88c 100644
--- a/docs/content/docs/custom-resource/reference.md
+++ b/docs/content/docs/custom-resource/reference.md
@@ -90,17 +90,6 @@ This serves as a full reference for FlinkDeployment and
FlinkSessionJob custom r
| flinkConfiguration | java.util.Map<java.lang.String,java.lang.String> |
Flink configuration overrides for the Flink deployment or Flink session job. |
| deploymentName | java.lang.String | The name of the target session cluster
deployment. |
-### FlinkStateSnapshotReference
-**Class**:
org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference
-
-**Description**: Reference for a FlinkStateSnapshot.
-
-| Parameter | Type | Docs |
-| ----------| ---- | ---- |
-| namespace | java.lang.String | Namespace of the snapshot resource. |
-| name | java.lang.String | Name of the snapshot resource. |
-| path | java.lang.String | If a path is given, all other fields will be
ignored, and this will be used as the initial savepoint path. |
-
### FlinkStateSnapshotSpec
**Class**: org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec
@@ -172,7 +161,6 @@ This serves as a full reference for FlinkDeployment and
FlinkSessionJob custom r
| ----------| ---- | ---- |
| kind | org.apache.flink.kubernetes.operator.api.spec.JobKind | Kind of the
Flink resource, FlinkDeployment or FlinkSessionJob. |
| name | java.lang.String | Name of the Flink resource. |
-| namespace | java.lang.String | Namespace of the Flink resource. If empty,
the operator will use the namespace of the snapshot. |
### JobSpec
**Class**: org.apache.flink.kubernetes.operator.api.spec.JobSpec
@@ -188,11 +176,10 @@ This serves as a full reference for FlinkDeployment and
FlinkSessionJob custom r
| state | org.apache.flink.kubernetes.operator.api.spec.JobState | Desired
state for the job. |
| savepointTriggerNonce | java.lang.Long | Nonce used to manually trigger
savepoint for the running job. In order to trigger a savepoint, change the
number to a different non-null value. |
| initialSavepointPath | java.lang.String | Savepoint path used by the job the
first time it is deployed or during savepoint redeployments (triggered by
changing the savepointRedeployNonce). |
-| flinkStateSnapshotReference |
org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference |
Snapshot reference used by the job the first time it is deployed or during
savepoint redeployments (triggered by changing the savepointRedeployNonce). |
| checkpointTriggerNonce | java.lang.Long | Nonce used to manually trigger
checkpoint for the running job. In order to trigger a checkpoint, change the
number to a different non-null value. |
| upgradeMode | org.apache.flink.kubernetes.operator.api.spec.UpgradeMode |
Upgrade mode of the Flink job. |
| allowNonRestoredState | java.lang.Boolean | Allow checkpoint state that
cannot be mapped to any job vertex in tasks. |
-| savepointRedeployNonce | java.lang.Long | Nonce used to trigger a full
redeployment of the job from the savepoint path specified in
initialSavepointPath or the path/FlinkStateSnapshot reference in
flinkStateSnapshotReference. In order to trigger redeployment, change the
number to a different non-null value. Rollback is not possible after
redeployment. |
+| savepointRedeployNonce | java.lang.Long | Nonce used to trigger a full
redeployment of the job from the savepoint path specified in
initialSavepointPath. In order to trigger redeployment, change the number to a
different non-null value. Rollback is not possible after redeployment. |
### JobState
**Class**: org.apache.flink.kubernetes.operator.api.spec.JobState
@@ -418,7 +405,7 @@ This serves as a full reference for FlinkDeployment and
FlinkSessionJob custom r
| state | java.lang.String | Last observed state of the job. |
| startTime | java.lang.String | Start time of the job. |
| updateTime | java.lang.String | Update time of the job. |
-| upgradeSnapshotReference |
org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference | |
+| upgradeSavepointPath | java.lang.String | |
| savepointInfo |
org.apache.flink.kubernetes.operator.api.status.SavepointInfo | Information
about pending and last savepoint for the job. |
| checkpointInfo |
org.apache.flink.kubernetes.operator.api.status.CheckpointInfo | Information
about pending and last checkpoint for the job. |
diff --git a/docs/content/docs/custom-resource/snapshots.md
b/docs/content/docs/custom-resource/snapshots.md
index e0d39835..3c1771bc 100644
--- a/docs/content/docs/custom-resource/snapshots.md
+++ b/docs/content/docs/custom-resource/snapshots.md
@@ -38,7 +38,7 @@ If you set this to false, the operator will keep using the
deprecated status fie
To create a savepoint or checkpoint, exactly one of the spec fields
`savepoint` or `checkpoint` must present.
Furthermore, in case of a savepoint you can signal to the operator that the
savepoint already exists using the `alreadyExists` field, and the operator will
mark it as a successful snapshot in the next reconciliation phase.
-You can also instruct the Operator to start a new
FlinkDeployment/FlinkSessionJob from an existing snapshot CR by using
`flinkStateSnapshotReference` in the job spec.
+You can also instruct the Operator to start a new
FlinkDeployment/FlinkSessionJob from an existing snapshot by using
`initialSavepointPath` in the job spec.
## Examples
@@ -78,11 +78,11 @@ spec:
### Start job from existing snapshot
+To start a job from an existing snapshot, you need to extract the path then
use:
+
```yaml
job:
- flinkStateSnapshotReference:
- namespace: flink # not required if it's in the same namespace
- name: example-savepoint
+ initialSavepointPath: [savepoint_path]
```
{{< hint warning >}}
@@ -131,7 +131,7 @@ This feature is not available for checkpoints.
## Triggering snapshots
Upgrade savepoints are triggered automatically by the system during the
upgrade process as we have seen in the previous sections.
-In this case, the savepoint path will also be recorded in the
`upgradeSnapshotReference` job status field, which the operator will use when
restarting the job.
+In this case, the savepoint path will also be recorded in the
`upgradeSavepointPath` job status field, which the operator will use when
restarting the job.
For backup, job forking and other purposes savepoint and checkpoints can be
triggered manually or periodically by the operator, however generally speaking
these will not be used during upgrades and are not required for the correct
operation.
diff --git a/docs/content/docs/operations/upgrade.md
b/docs/content/docs/operations/upgrade.md
index 65f188b4..e9b7ec92 100644
--- a/docs/content/docs/operations/upgrade.md
+++ b/docs/content/docs/operations/upgrade.md
@@ -148,20 +148,19 @@ Here is a reference example of upgrading a
`basic-checkpoint-ha-example` deploym
```
5. Restore the job:
- Deploy the previously deleted job using this
[FlinkDeployemnt](https://raw.githubusercontent.com/apache/flink-kubernetes-operator/main/examples/basic-checkpoint-ha.yaml)
with `v1beta1` and explicitly set the `job.flinkStateSnapshotReference.path`
to the savepoint location obtained from the step 1.
+ Deploy the previously deleted job using this
[FlinkDeployemnt](https://raw.githubusercontent.com/apache/flink-kubernetes-operator/main/examples/basic-checkpoint-ha.yaml)
with `v1beta1` and explicitly set the `job.initialSavepointPath` to the
savepoint location obtained from the step 1.
```
spec:
...
job:
- flinkStateSnapshotReference:
- path: /flink-data/savepoints/savepoint-000000-aec3dd08e76d/_metadata
+ initialSavepointPath:
/flink-data/savepoints/savepoint-000000-aec3dd08e76d/_metadata
upgradeMode: savepoint
...
```
Alternatively, we may use this command to edit and deploy the manifest:
```sh
- wget -qO -
https://raw.githubusercontent.com/apache/flink-kubernetes-operator/main/examples/basic-checkpoint-ha.yaml|
yq w - "spec.job.flinkStateSnapshotReference.path"
"/flink-data/savepoints/savepoint-000000-aec3dd08e76d/_metadata"| kubectl apply
-f -
+ wget -qO -
https://raw.githubusercontent.com/apache/flink-kubernetes-operator/main/examples/basic-checkpoint-ha.yaml|
yq w - "spec.job.initialSavepointPath"
"/flink-data/savepoints/savepoint-000000-aec3dd08e76d/_metadata"| kubectl apply
-f -
```
Finally, verify that `deploy/basic-checkpoint-ha-example` log has:
```
diff --git a/e2e-tests/test_snapshot.sh b/e2e-tests/test_snapshot.sh
index 8d080465..32006ddb 100755
--- a/e2e-tests/test_snapshot.sh
+++ b/e2e-tests/test_snapshot.sh
@@ -56,10 +56,10 @@ wait_for_status $APPLICATION_IDENTIFIER
'.status.jobStatus.state' RUNNING ${TIME
kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch
'{"spec":{"job":{"state": "suspended"}}}'
wait_for_status $APPLICATION_IDENTIFIER '.status.lifecycleState' "SUSPENDED"
${TIMEOUT} || exit 1
-location=$(kubectl get $APPLICATION_IDENTIFIER -o yaml | yq
'.status.jobStatus.upgradeSnapshotReference.path')
+location=$(kubectl get $APPLICATION_IDENTIFIER -o yaml | yq
'.status.jobStatus.upgradeSavepointPath')
if [ "$location" == "" ]; then echo "Legacy savepoint location was empty";
exit 1; fi
-echo "Removing upgradeSnapshotReference and setting lastSavepoint"
-kubectl patch flinkdep ${CLUSTER_ID} --type=merge --subresource status --patch
'{"status":{"jobStatus":{"upgradeSnapshotReference":null,"savepointInfo":{"lastSavepoint":{"timeStamp":
0, "location": "'$location'", "triggerNonce": 0}}}}}'
+echo "Removing upgradeSavepointPath and setting lastSavepoint"
+kubectl patch flinkdep ${CLUSTER_ID} --type=merge --subresource status --patch
'{"status":{"jobStatus":{"upgradeSavepointPath":null,"savepointInfo":{"lastSavepoint":{"timeStamp":
0, "location": "'$location'", "triggerNonce": 0}}}}}'
# Delete operator Pod to clear CR state cache
kubectl delete pod -n $(get_operator_pod_namespace) $(get_operator_pod_name)
@@ -151,13 +151,11 @@ wait_for_status $APPLICATION_IDENTIFIER
'.status.lifecycleState' "SUSPENDED" ${T
echo "Waiting for upgrade savepoint..."
snapshot=$(wait_for_snapshot $CLUSTER_ID "savepoint" "upgrade" ${TIMEOUT})
if [ "$snapshot" == "" ]; then echo "Could not find snapshot"; exit 1; fi
-echo "Found upgrade snapshot: $snapshot"
-wait_for_status $APPLICATION_IDENTIFIER
'.status.jobStatus.upgradeSnapshotReference.name' "$snapshot" ${TIMEOUT} ||
exit 1
location=$(kubectl get flinksnp/$snapshot -o yaml | yq '.status.path')
if [ "$location" == "" ]; then echo "Upgrade savepoint location was empty";
exit 1; fi
-
+wait_for_status $APPLICATION_IDENTIFIER
'.status.jobStatus.upgradeSavepointPath' "$location" ${TIMEOUT} || exit 1
echo "Restarting deployment..."
kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"job":
{"state": "running" } } }'
diff --git a/examples/snapshot/job-from-savepoint.yaml
b/examples/snapshot/job-from-savepoint.yaml
index 698ad220..cc6627a0 100644
--- a/examples/snapshot/job-from-savepoint.yaml
+++ b/examples/snapshot/job-from-savepoint.yaml
@@ -64,6 +64,4 @@ spec:
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
parallelism: 2
upgradeMode: savepoint
- flinkStateSnapshotReference:
- name: example-savepoint
- namespace: flink
+ initialSavepointPath:
file:///flink-data/savepoints/savepoint-45305c-d867504446e0
diff --git
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkStateSnapshotReference.java
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkStateSnapshotReference.java
deleted file mode 100644
index 548692c9..00000000
---
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkStateSnapshotReference.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.kubernetes.operator.api.spec;
-
-import org.apache.flink.annotation.Experimental;
-import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-/** Reference for a FlinkStateSnapshot. */
-@Experimental
-@Data
-@NoArgsConstructor
-@AllArgsConstructor
-@Builder
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class FlinkStateSnapshotReference {
-
- /** Namespace of the snapshot resource. */
- private String namespace;
-
- /** Name of the snapshot resource. */
- private String name;
-
- /**
- * If a path is given, all other fields will be ignored, and this will be
used as the initial
- * savepoint path.
- */
- private String path;
-
- public static FlinkStateSnapshotReference fromPath(String path) {
- return new FlinkStateSnapshotReference(null, null, path);
- }
-
- public static FlinkStateSnapshotReference fromResource(FlinkStateSnapshot
resource) {
- return new FlinkStateSnapshotReference(
- resource.getMetadata().getNamespace(),
resource.getMetadata().getName(), null);
- }
-}
diff --git
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobReference.java
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobReference.java
index 068b0a64..856e4579 100644
---
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobReference.java
+++
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobReference.java
@@ -44,16 +44,9 @@ public class JobReference {
/** Name of the Flink resource. */
private String name;
- /**
- * Namespace of the Flink resource. If empty, the operator will use the
namespace of the
- * snapshot.
- */
- private String namespace;
-
public static JobReference fromFlinkResource(AbstractFlinkResource<?, ?>
flinkResource) {
var result = new JobReference();
result.setName(flinkResource.getMetadata().getName());
- result.setNamespace(flinkResource.getMetadata().getNamespace());
if (flinkResource instanceof FlinkDeployment) {
result.setKind(JobKind.FLINK_DEPLOYMENT);
@@ -71,6 +64,6 @@ public class JobReference {
} else if (kind == JobKind.FLINK_SESSION_JOB) {
kindString = CrdConstants.KIND_SESSION_JOB;
}
- return String.format("%s/%s (%s)", namespace, name, kindString);
+ return String.format("%s (%s)", name, kindString);
}
}
diff --git
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobSpec.java
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobSpec.java
index 1e845b13..a6c582cc 100644
---
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobSpec.java
+++
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobSpec.java
@@ -72,16 +72,8 @@ public class JobSpec implements Diffable<JobSpec> {
* redeployments (triggered by changing the savepointRedeployNonce).
*/
@SpecDiff(DiffType.IGNORE)
- @Deprecated
private String initialSavepointPath;
- /**
- * Snapshot reference used by the job the first time it is deployed or
during savepoint
- * redeployments (triggered by changing the savepointRedeployNonce).
- */
- @SpecDiff(DiffType.IGNORE)
- private FlinkStateSnapshotReference flinkStateSnapshotReference;
-
/**
* Nonce used to manually trigger checkpoint for the running job. In order
to trigger a
* checkpoint, change the number to a different non-null value.
@@ -100,9 +92,8 @@ public class JobSpec implements Diffable<JobSpec> {
/**
* Nonce used to trigger a full redeployment of the job from the savepoint
path specified in
- * initialSavepointPath or the path/FlinkStateSnapshot reference in
flinkStateSnapshotReference.
- * In order to trigger redeployment, change the number to a different
non-null value. Rollback
- * is not possible after redeployment.
+ * initialSavepointPath. In order to trigger redeployment, change the
number to a different
+ * non-null value. Rollback is not possible after redeployment.
*/
@SpecDiff(value = DiffType.SAVEPOINT_REDEPLOY, onNullIgnore = true)
private Long savepointRedeployNonce;
diff --git
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobStatus.java
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobStatus.java
index 28cfde87..6adef53c 100644
---
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobStatus.java
+++
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobStatus.java
@@ -18,7 +18,6 @@
package org.apache.flink.kubernetes.operator.api.status;
import org.apache.flink.annotation.Experimental;
-import
org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import io.fabric8.crd.generator.annotation.PrinterColumn;
@@ -51,7 +50,7 @@ public class JobStatus {
/** Update time of the job. */
private String updateTime;
- private FlinkStateSnapshotReference upgradeSnapshotReference;
+ private String upgradeSavepointPath;
/** Information about pending and last savepoint for the job. */
@Deprecated private SavepointInfo savepointInfo = new SavepointInfo();
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java
index 862a5dd6..59272af6 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java
@@ -23,7 +23,6 @@ import org.apache.flink.autoscaler.utils.DateTimeUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
-import
org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;
import org.apache.flink.kubernetes.operator.api.status.Checkpoint;
import org.apache.flink.kubernetes.operator.api.status.CheckpointInfo;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
@@ -454,10 +453,7 @@ public class SnapshotObserver<
flinkService
.getLastCheckpoint(JobID.fromHexString(jobID),
observeConfig)
.ifPresent(
- snapshot ->
- jobStatus.setUpgradeSnapshotReference(
-
FlinkStateSnapshotReference.fromPath(
- snapshot.getLocation())));
+ snapshot ->
jobStatus.setUpgradeSavepointPath(snapshot.getLocation()));
} catch (Exception e) {
LOG.error("Could not observe latest checkpoint information.", e);
throw new ReconciliationException(e);
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index 8f2c1a7e..cc57889b 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -27,7 +27,6 @@ import
org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.diff.DiffType;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
-import
org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
@@ -41,7 +40,6 @@ import
org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.diff.DiffResult;
import
org.apache.flink.kubernetes.operator.reconciler.diff.ReflectiveDiffBuilder;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
-import org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotUtils;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
@@ -122,7 +120,7 @@ public abstract class AbstractFlinkResourceReconciler<
updateStatusBeforeFirstDeployment(
cr, spec, deployConfig, status, ctx.getKubernetesClient());
- deploy(ctx, spec, deployConfig, getInitialSnapshotPath(ctx, spec),
false);
+ deploy(ctx, spec, deployConfig, getInitialSnapshotPath(spec),
false);
ReconciliationUtils.updateStatusForDeployedSpec(cr, deployConfig,
clock);
return;
@@ -178,19 +176,11 @@ public abstract class AbstractFlinkResourceReconciler<
}
}
- private Optional<String> getInitialSnapshotPath(
- FlinkResourceContext<CR> ctx, AbstractFlinkSpec spec) {
+ private Optional<String> getInitialSnapshotPath(AbstractFlinkSpec spec) {
if (spec.getJob() == null) {
return Optional.empty();
}
- if (spec.getJob().getFlinkStateSnapshotReference() != null) {
- return Optional.of(
- FlinkStateSnapshotUtils.getValidatedFlinkStateSnapshotPath(
- ctx.getKubernetesClient(),
- spec.getJob().getFlinkStateSnapshotReference()));
- }
-
return Optional.ofNullable(spec.getJob().getInitialSavepointPath());
}
@@ -233,16 +223,10 @@ public abstract class AbstractFlinkResourceReconciler<
CR cr, SPEC spec, Configuration deployConfig, STATUS status,
KubernetesClient client) {
if (spec.getJob() != null) {
var initialUpgradeMode = UpgradeMode.STATELESS;
- var snapshotRef = spec.getJob().getFlinkStateSnapshotReference();
var initialSp = spec.getJob().getInitialSavepointPath();
- if (snapshotRef != null) {
- status.getJobStatus().setUpgradeSnapshotReference(snapshotRef);
- initialUpgradeMode = UpgradeMode.SAVEPOINT;
- } else if (initialSp != null) {
- status.getJobStatus()
- .setUpgradeSnapshotReference(
-
FlinkStateSnapshotReference.fromPath(initialSp));
+ if (initialSp != null) {
+ status.getJobStatus().setUpgradeSavepointPath(initialSp);
initialUpgradeMode = UpgradeMode.SAVEPOINT;
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index 7d8a15b0..babb05f8 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.diff.DiffType;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
-import
org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
@@ -48,7 +47,6 @@ import org.apache.flink.util.Preconditions;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import lombok.Value;
-import org.apache.commons.lang3.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -288,15 +286,7 @@ public abstract class AbstractJobReconciler<
if (spec.getJob().getUpgradeMode() == UpgradeMode.SAVEPOINT) {
savepointOpt =
Optional.ofNullable(
- ctx.getResource()
- .getStatus()
- .getJobStatus()
- .getUpgradeSnapshotReference())
- .map(
- ref ->
- FlinkStateSnapshotUtils
-
.getValidatedFlinkStateSnapshotPath(
-
ctx.getKubernetesClient(), ref));
+
ctx.getResource().getStatus().getJobStatus().getUpgradeSavepointPath());
if (savepointOpt.isEmpty()) {
savepointOpt =
Optional.ofNullable(
@@ -313,28 +303,26 @@ public abstract class AbstractJobReconciler<
}
/**
- * Updates the upgrade snapshot reference field in the JobSpec of the
current Flink resource. If
- * snapshot resources are enabled, a new FlinkStateSnapshot will be
created, else it will only
- * set the path field of the snapshot reference.
+ * Updates the upgrade savepoint field in the JobSpec of the current Flink
resource and if
+ * snapshot resources are enabled, a new FlinkStateSnapshot will be
created.
*
* @param ctx context
* @param savepointLocation location of savepoint taken
*/
- protected void setUpgradeSnapshotReferenceFromSavepoint(
- FlinkResourceContext<?> ctx, String savepointLocation) {
- var conf = ObjectUtils.firstNonNull(ctx.getObserveConfig(), new
Configuration());
+ protected void setUpgradeSavepointPath(FlinkResourceContext<?> ctx, String
savepointLocation) {
+ var conf = ctx.getObserveConfig();
var savepointFormatType =
-
conf.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE);
-
- var snapshotRef =
- FlinkStateSnapshotUtils.createReferenceForUpgradeSavepoint(
- conf,
- ctx.getOperatorConfig(),
- ctx.getKubernetesClient(),
- ctx.getResource(),
-
SavepointFormatType.valueOf(savepointFormatType.name()),
- savepointLocation);
-
ctx.getResource().getStatus().getJobStatus().setUpgradeSnapshotReference(snapshotRef);
+ ctx.getObserveConfig()
+
.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE);
+
+ FlinkStateSnapshotUtils.createUpgradeSnapshotResource(
+ conf,
+ ctx.getOperatorConfig(),
+ ctx.getKubernetesClient(),
+ ctx.getResource(),
+ SavepointFormatType.valueOf(savepointFormatType.name()),
+ savepointLocation);
+
ctx.getResource().getStatus().getJobStatus().setUpgradeSavepointPath(savepointLocation);
}
/**
@@ -471,11 +459,11 @@ public abstract class AbstractJobReconciler<
LOG.info("Resubmitting Flink job...");
SPEC specToRecover =
ReconciliationUtils.getDeployedSpec(ctx.getResource());
- var upgradeSnapshotRef =
-
ctx.getResource().getStatus().getJobStatus().getUpgradeSnapshotReference();
+ var upgradeStatePath =
+
ctx.getResource().getStatus().getJobStatus().getUpgradeSavepointPath();
var savepointLegacy =
ctx.getResource().getStatus().getJobStatus().getSavepointInfo().getLastSavepoint();
- var lastSavepointKnown = upgradeSnapshotRef != null || savepointLegacy
!= null;
+ var lastSavepointKnown = upgradeStatePath != null || savepointLegacy
!= null;
if (requireHaMetadata) {
specToRecover.getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
@@ -498,21 +486,9 @@ public abstract class AbstractJobReconciler<
cancelJob(ctx, UpgradeMode.STATELESS);
currentDeploySpec.getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
- var snapshotRef =
currentDeploySpec.getJob().getFlinkStateSnapshotReference();
- var initialSavepointPath =
currentDeploySpec.getJob().getInitialSavepointPath();
-
- if (snapshotRef == null && initialSavepointPath != null) {
- snapshotRef =
FlinkStateSnapshotReference.fromPath(initialSavepointPath);
- }
-
- Optional<String> savepointPath = Optional.empty();
- if (snapshotRef != null) {
- savepointPath =
- Optional.of(
-
FlinkStateSnapshotUtils.getValidatedFlinkStateSnapshotPath(
- ctx.getKubernetesClient(), snapshotRef));
- status.getJobStatus().setUpgradeSnapshotReference(snapshotRef);
- }
+ Optional<String> savepointPath =
+
Optional.ofNullable(currentDeploySpec.getJob().getInitialSavepointPath());
+
status.getJobStatus().setUpgradeSavepointPath(savepointPath.orElse(null));
if (desiredJobState == JobState.RUNNING) {
deploy(
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index 34406b9d..ac6c69e9 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -26,7 +26,6 @@ import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
-import
org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
@@ -167,9 +166,7 @@ public class ApplicationReconciler
// Last state deployment, explicitly set a dummy savepoint path to
avoid accidental
// incorrect state restore in case the HA metadata is deleted by
the user
deployConfig.set(SavepointConfigOptions.SAVEPOINT_PATH,
LAST_STATE_DUMMY_SP_PATH);
- status.getJobStatus()
- .setUpgradeSnapshotReference(
-
FlinkStateSnapshotReference.fromPath(LAST_STATE_DUMMY_SP_PATH));
+
status.getJobStatus().setUpgradeSavepointPath(LAST_STATE_DUMMY_SP_PATH);
} else {
// Stateless deployment, remove any user configured savepoint path
deployConfig.removeConfig(SavepointConfigOptions.SAVEPOINT_PATH);
@@ -242,7 +239,7 @@ public class ApplicationReconciler
var conf = ObjectUtils.firstNonNull(ctx.getObserveConfig(), new
Configuration());
ctx.getFlinkService()
.cancelJob(ctx.getResource(), upgradeMode, conf)
- .ifPresent(location ->
setUpgradeSnapshotReferenceFromSavepoint(ctx, location));
+ .ifPresent(location -> setUpgradeSavepointPath(ctx, location));
}
@Override
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
index dcf781c3..3f54a8c7 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
@@ -104,7 +104,7 @@ public class SessionJobReconciler
ctx.getFlinkService()
.cancelSessionJob(ctx.getResource(), upgradeMode, conf)
- .ifPresent(location ->
setUpgradeSnapshotReferenceFromSavepoint(ctx, location));
+ .ifPresent(location -> setUpgradeSavepointPath(ctx, location));
ctx.getResource().getStatus().getJobStatus().setJobId(null);
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java
index 018a080b..ecc6c5c9 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java
@@ -41,7 +41,6 @@ import
io.javaoperatorsdk.operator.processing.event.source.informer.Mappers;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -72,13 +71,10 @@ public class EventSourceUtils {
if (jobRef == null || jobRef.getName() ==
null) {
return Collections.emptySet();
}
- var namespace =
-
Optional.ofNullable(jobRef.getNamespace())
-
.orElse(snapshot.getMetadata().getNamespace());
return Set.of(
new ResourceID(
snapshot.getSpec().getJobReference().getName(),
- namespace));
+
snapshot.getMetadata().getNamespace()));
})
.withNamespacesInheritedFromController(context)
.followNamespaceChanges(true)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java
index fa542478..5cfbcb1e 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java
@@ -23,7 +23,6 @@ import
org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.CrdConstants;
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
import org.apache.flink.kubernetes.operator.api.spec.CheckpointSpec;
-import
org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;
import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobReference;
import org.apache.flink.kubernetes.operator.api.spec.SavepointSpec;
@@ -39,7 +38,6 @@ import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import org.apache.commons.lang3.ObjectUtils;
-import org.apache.commons.lang3.StringUtils;
import javax.annotation.Nullable;
@@ -59,71 +57,6 @@ import static
org.apache.flink.kubernetes.operator.reconciler.SnapshotType.SAVEP
/** Utilities class for FlinkStateSnapshot resources. */
public class FlinkStateSnapshotUtils {
- /**
- * From a snapshot reference, return its snapshot path. If a {@link
FlinkStateSnapshot} is
- * referenced, it will be retrieved from Kubernetes.
- *
- * @param kubernetesClient kubernetes client
- * @param snapshotRef snapshot reference
- * @return found savepoint path
- */
- public static String getValidatedFlinkStateSnapshotPath(
- KubernetesClient kubernetesClient, FlinkStateSnapshotReference
snapshotRef) {
- if (StringUtils.isNotBlank(snapshotRef.getPath())) {
- return snapshotRef.getPath();
- }
-
- if (StringUtils.isBlank(snapshotRef.getName())) {
- throw new IllegalArgumentException(
- String.format("Invalid snapshot name: %s",
snapshotRef.getName()));
- }
-
- var result =
- snapshotRef.getNamespace() == null
- ? kubernetesClient
- .resources(FlinkStateSnapshot.class)
- .withName(snapshotRef.getName())
- .get()
- : kubernetesClient
- .resources(FlinkStateSnapshot.class)
- .inNamespace(snapshotRef.getNamespace())
- .withName(snapshotRef.getName())
- .get();
-
- if (result == null) {
- throw new IllegalStateException(
- String.format(
- "Cannot find snapshot %s in namespace %s.",
- snapshotRef.getNamespace(),
snapshotRef.getName()));
- }
-
- // We can return the savepoint path if it's marked as completed
without waiting for the
- // reconciler to update its status.
- if (result.getSpec().isSavepoint() &&
result.getSpec().getSavepoint().getAlreadyExists()) {
- var path = result.getSpec().getSavepoint().getPath();
- if (!StringUtils.isBlank(path)) {
- return path;
- }
- }
-
- if (COMPLETED != result.getStatus().getState()) {
- throw new IllegalStateException(
- String.format(
- "Snapshot %s/%s is not complete yet.",
- snapshotRef.getNamespace(),
snapshotRef.getName()));
- }
-
- var path = result.getStatus().getPath();
- if (StringUtils.isBlank(path)) {
- throw new IllegalStateException(
- String.format(
- "Snapshot %s/%s path is incorrect: %s.",
- snapshotRef.getNamespace(), snapshotRef.getName(),
path));
- }
-
- return path;
- }
-
protected static FlinkStateSnapshot createFlinkStateSnapshot(
KubernetesClient kubernetesClient,
String namespace,
@@ -263,9 +196,7 @@ public class FlinkStateSnapshotUtils {
}
/**
- * For an upgrade savepoint, create a {@link FlinkStateSnapshot} on the
Kubernetes cluster and
- * return its reference if snapshot resources are enabled. In other case
return a reference
- * containing only the path.
+ * For an upgrade savepoint, create a {@link FlinkStateSnapshot} on the
Kubernetes cluster.
*
* @param conf job configuration
* @param operatorConf operator configuration
@@ -273,9 +204,9 @@ public class FlinkStateSnapshotUtils {
* @param flinkResource referenced Flink resource
* @param savepointFormatType savepoint format type
* @param savepointPath path of savepoint
- * @return reference for snapshot
+ * @return State snapshot resource
*/
- public static FlinkStateSnapshotReference
createReferenceForUpgradeSavepoint(
+ public static Optional<FlinkStateSnapshot> createUpgradeSnapshotResource(
Configuration conf,
FlinkOperatorConfiguration operatorConf,
KubernetesClient kubernetesClient,
@@ -283,12 +214,12 @@ public class FlinkStateSnapshotUtils {
SavepointFormatType savepointFormatType,
String savepointPath) {
if (!isSnapshotResourceEnabled(operatorConf, conf)) {
- return FlinkStateSnapshotReference.fromPath(savepointPath);
+ return Optional.empty();
}
var disposeOnDelete =
conf.get(KubernetesOperatorConfigOptions.OPERATOR_JOB_SAVEPOINT_DISPOSE_ON_DELETE);
- var snapshot =
+ var savepointResource =
createSavepointResource(
kubernetesClient,
flinkResource,
@@ -296,7 +227,7 @@ public class FlinkStateSnapshotUtils {
SnapshotTriggerType.UPGRADE,
savepointFormatType,
disposeOnDelete);
- return FlinkStateSnapshotReference.fromResource(snapshot);
+ return Optional.of(savepointResource);
}
/**
@@ -439,9 +370,8 @@ public class FlinkStateSnapshotUtils {
* @return namespace with the job reference to be found in
*/
public static ResourceID
getSnapshotJobReferenceResourceId(FlinkStateSnapshot snapshot) {
- var namespace =
-
Optional.ofNullable(snapshot.getSpec().getJobReference().getNamespace())
- .orElse(snapshot.getMetadata().getNamespace());
- return new ResourceID(snapshot.getSpec().getJobReference().getName(),
namespace);
+ return new ResourceID(
+ snapshot.getSpec().getJobReference().getName(),
+ snapshot.getMetadata().getNamespace());
}
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java
index b3d4dbde..f8ce07f4 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java
@@ -24,6 +24,7 @@ import
org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.api.status.JobStatus;
+import org.apache.flink.kubernetes.operator.api.status.Savepoint;
import org.apache.flink.kubernetes.operator.api.status.SnapshotInfo;
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
import
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
@@ -32,6 +33,7 @@ import
org.apache.flink.kubernetes.operator.reconciler.SnapshotType;
import
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler;
import io.fabric8.kubernetes.client.KubernetesClient;
+import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.core.util.CronExpression;
import org.slf4j.Logger;
@@ -360,25 +362,17 @@ public class SnapshotUtils {
* @return True if last savepoint is known
*/
public static boolean lastSavepointKnown(CommonStatus<?> status) {
- var lastSavepoint =
status.getJobStatus().getUpgradeSnapshotReference();
-
- if (lastSavepoint != null) {
- if (StringUtils.isNotBlank(lastSavepoint.getName())) {
- return true;
- }
-
- var location = lastSavepoint.getPath();
- return location != null
- &&
!location.equals(AbstractJobReconciler.LAST_STATE_DUMMY_SP_PATH);
- }
-
- // Check legacy savepoint field too
- var lastSavepointLegacy =
status.getJobStatus().getSavepointInfo().getLastSavepoint();
- if (lastSavepointLegacy == null) {
+ var location =
+ ObjectUtils.firstNonNull(
+ status.getJobStatus().getUpgradeSavepointPath(),
+ Optional.ofNullable(
+
status.getJobStatus().getSavepointInfo().getLastSavepoint())
+ .map(Savepoint::getLocation)
+ .orElse(null));
+
+ if (location == null) {
return true;
}
- return !lastSavepointLegacy
- .getLocation()
- .equals(AbstractJobReconciler.LAST_STATE_DUMMY_SP_PATH);
+ return
!location.equals(AbstractJobReconciler.LAST_STATE_DUMMY_SP_PATH);
}
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
index 994cce8d..b326e96b 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
@@ -294,12 +294,6 @@ public class DefaultValidator implements
FlinkResourceValidator {
return Optional.of("Job parallelism must be larger than 0");
}
- if (!StringUtils.isNullOrWhitespaceOnly(job.getInitialSavepointPath())
- && job.getFlinkStateSnapshotReference() != null) {
- return Optional.of(
- "Cannot set both initialSavepointPath and
flinkStateSnapshotReference in the job spec");
- }
-
return Optional.empty();
}
@@ -474,10 +468,9 @@ public class DefaultValidator implements
FlinkResourceValidator {
if (newJob.getSavepointRedeployNonce() != null
&& !newJob.getSavepointRedeployNonce()
.equals(oldJob.getSavepointRedeployNonce())) {
- if
(StringUtils.isNullOrWhitespaceOnly(newJob.getInitialSavepointPath())
- && newJob.getFlinkStateSnapshotReference() == null) {
+ if
(StringUtils.isNullOrWhitespaceOnly(newJob.getInitialSavepointPath())) {
return Optional.of(
- "InitialSavepointPath and
flinkStateSnapshotReference must not be empty for savepoint redeployment");
+ "InitialSavepointPath must not be empty for
savepoint redeployment");
}
}
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 1bb04216..32e49502 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -25,7 +25,6 @@ import
org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import
org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
-import
org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.IngressSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
@@ -397,9 +396,7 @@ public class FlinkDeploymentControllerTest {
new TaskManagerInfo(
"component=taskmanager,app=" +
appCluster.getMetadata().getName(), 1),
appCluster.getStatus().getTaskManager());
- assertEquals(
- FlinkStateSnapshotReference.fromPath("s0"),
-
appCluster.getStatus().getJobStatus().getUpgradeSnapshotReference());
+ assertEquals("s0",
appCluster.getStatus().getJobStatus().getUpgradeSavepointPath());
var previousJobs = new ArrayList<>(jobs);
appCluster.getSpec().getJob().setInitialSavepointPath("s1");
@@ -407,9 +404,7 @@ public class FlinkDeploymentControllerTest {
// Send in a no-op change
testController.reconcile(appCluster, context);
assertEquals(previousJobs, new ArrayList<>(flinkService.listJobs()));
- assertEquals(
- FlinkStateSnapshotReference.fromPath("s0"),
-
appCluster.getStatus().getJobStatus().getUpgradeSnapshotReference());
+ assertEquals("s0",
appCluster.getStatus().getJobStatus().getUpgradeSavepointPath());
// Upgrade job
appCluster.getSpec().getJob().setParallelism(100);
@@ -425,8 +420,7 @@ public class FlinkDeploymentControllerTest {
.getState());
assertEquals(new TaskManagerInfo("", 0),
appCluster.getStatus().getTaskManager());
assertEquals(
- FlinkStateSnapshotReference.fromPath("savepoint_0"),
-
appCluster.getStatus().getJobStatus().getUpgradeSnapshotReference());
+ "savepoint_0",
appCluster.getStatus().getJobStatus().getUpgradeSavepointPath());
testController.reconcile(appCluster, context);
jobs = flinkService.listJobs();
@@ -441,8 +435,7 @@ public class FlinkDeploymentControllerTest {
JobManagerDeploymentStatus.READY,
appCluster.getStatus().getJobManagerDeploymentStatus());
assertEquals(
- FlinkStateSnapshotReference.fromPath("savepoint_1"),
-
appCluster.getStatus().getJobStatus().getUpgradeSnapshotReference());
+ "savepoint_1",
appCluster.getStatus().getJobStatus().getUpgradeSavepointPath());
// Resume from last savepoint
appCluster.getSpec().getJob().setState(JobState.RUNNING);
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
index fb916b66..ba6af301 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
-import
org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
@@ -193,9 +192,7 @@ class FlinkSessionJobControllerTest {
var jobs = flinkService.listJobs();
assertEquals(1, jobs.size());
assertEquals("s0", jobs.get(0).f0);
- assertEquals(
- FlinkStateSnapshotReference.fromPath("s0"),
-
sessionJob.getStatus().getJobStatus().getUpgradeSnapshotReference());
+ assertEquals("s0",
sessionJob.getStatus().getJobStatus().getUpgradeSavepointPath());
var previousJobs = new ArrayList<>(jobs);
sessionJob.getSpec().getJob().setInitialSavepointPath("s1");
@@ -203,16 +200,13 @@ class FlinkSessionJobControllerTest {
// Send in a no-op change
testController.reconcile(sessionJob, context);
assertEquals(previousJobs, new ArrayList<>(flinkService.listJobs()));
- assertEquals(
- FlinkStateSnapshotReference.fromPath("s0"),
-
sessionJob.getStatus().getJobStatus().getUpgradeSnapshotReference());
+ assertEquals("s0",
sessionJob.getStatus().getJobStatus().getUpgradeSavepointPath());
// Upgrade job
sessionJob.getSpec().getJob().setParallelism(100);
updateControl = testController.reconcile(sessionJob, context);
assertEquals(
- FlinkStateSnapshotReference.fromPath("savepoint_0"),
-
sessionJob.getStatus().getJobStatus().getUpgradeSnapshotReference());
+ "savepoint_0",
sessionJob.getStatus().getJobStatus().getUpgradeSavepointPath());
assertEquals(0L, updateControl.getScheduleDelay().get());
assertEquals(
@@ -232,8 +226,7 @@ class FlinkSessionJobControllerTest {
assertEquals("savepoint_0", jobs.get(0).f0);
testController.reconcile(sessionJob, context);
assertEquals(
- FlinkStateSnapshotReference.fromPath("savepoint_0"),
-
sessionJob.getStatus().getJobStatus().getUpgradeSnapshotReference());
+ "savepoint_0",
sessionJob.getStatus().getJobStatus().getUpgradeSavepointPath());
// Suspend job
sessionJob.getSpec().getJob().setState(JobState.SUSPENDED);
@@ -247,8 +240,7 @@ class FlinkSessionJobControllerTest {
assertEquals(1, jobs.size());
assertEquals("savepoint_1", jobs.get(0).f0);
assertEquals(
- FlinkStateSnapshotReference.fromPath("savepoint_1"),
-
sessionJob.getStatus().getJobStatus().getUpgradeSnapshotReference());
+ "savepoint_1",
sessionJob.getStatus().getJobStatus().getUpgradeSavepointPath());
testController.reconcile(sessionJob, context);
testController.cleanup(sessionJob, context);
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java
index 6c4bd57a..81cf121b 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java
@@ -514,10 +514,8 @@ public class FlinkStateSnapshotControllerTest {
var snapshot = createSavepoint(deployment);
var errorMessage =
String.format(
- "Secondary resource %s/%s (%s) for savepoint
snapshot-test was not found",
- deployment.getMetadata().getNamespace(),
- deployment.getMetadata().getName(),
- CrdConstants.KIND_FLINK_DEPLOYMENT);
+ "Secondary resource %s (%s) for savepoint
snapshot-test was not found",
+ deployment.getMetadata().getName(),
CrdConstants.KIND_FLINK_DEPLOYMENT);
// First reconcile will trigger the snapshot.
controller.reconcile(snapshot, TestUtils.createSnapshotContext(client,
deployment));
@@ -558,10 +556,8 @@ public class FlinkStateSnapshotControllerTest {
var snapshot = createSavepoint(deployment);
var errorMessage =
String.format(
- "Secondary resource %s/%s (%s) for savepoint
snapshot-test is not running",
- deployment.getMetadata().getNamespace(),
- deployment.getMetadata().getName(),
- CrdConstants.KIND_FLINK_DEPLOYMENT);
+ "Secondary resource %s (%s) for savepoint
snapshot-test is not running",
+ deployment.getMetadata().getName(),
CrdConstants.KIND_FLINK_DEPLOYMENT);
controller.reconcile(snapshot, context);
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
index 7cf1b749..9f79c407 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
@@ -493,9 +493,7 @@ public class ApplicationObserverTest extends
OperatorTestBase {
assertEquals(
org.apache.flink.api.common.JobStatus.FAILED.name(),
deployment.getStatus().getJobStatus().getState());
- assertEquals(
- "last-SP",
-
deployment.getStatus().getJobStatus().getUpgradeSnapshotReference().getPath());
+ assertEquals("last-SP",
deployment.getStatus().getJobStatus().getUpgradeSavepointPath());
assertFalse(SnapshotUtils.savepointInProgress(deployment.getStatus().getJobStatus()));
observer.observe(deployment, readyContext);
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 51f90fb8..6a5f60ad 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -40,7 +40,6 @@ import org.apache.flink.kubernetes.operator.api.CrdConstants;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
-import
org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.JobReference;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
@@ -174,7 +173,7 @@ public class ApplicationReconcilerTest extends
OperatorTestBase {
// clean up
assertNull(deployment.getStatus().getJobStatus().getSavepointInfo().getLastSavepoint());
-
assertNull(deployment.getStatus().getJobStatus().getUpgradeSnapshotReference());
+
assertNull(deployment.getStatus().getJobStatus().getUpgradeSavepointPath());
reconciler.cleanup(
deployment,
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
@@ -185,11 +184,11 @@ public class ApplicationReconcilerTest extends
OperatorTestBase {
assertThat(snapshot.getSpec().getSavepoint().getPath())
.isEqualTo("savepoint_0");
assertEquals(
-
FlinkStateSnapshotReference.fromResource(snapshot),
+ "savepoint_0",
deployment
.getStatus()
.getJobStatus()
- .getUpgradeSnapshotReference());
+ .getUpgradeSavepointPath());
});
}
@@ -209,7 +208,7 @@ public class ApplicationReconcilerTest extends
OperatorTestBase {
verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
// clean up
-
assertNull(deployment.getStatus().getJobStatus().getUpgradeSnapshotReference());
+
assertNull(deployment.getStatus().getJobStatus().getUpgradeSavepointPath());
reconciler.cleanup(
deployment,
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
@@ -220,11 +219,11 @@ public class ApplicationReconcilerTest extends
OperatorTestBase {
assertThat(snapshot.getSpec().getSavepoint().getPath())
.isEqualTo("savepoint_0");
assertEquals(
-
FlinkStateSnapshotReference.fromResource(snapshot),
+ "savepoint_0",
deployment
.getStatus()
.getJobStatus()
- .getUpgradeSnapshotReference());
+ .getUpgradeSavepointPath());
});
}
@@ -554,7 +553,7 @@ public class ApplicationReconcilerTest extends
OperatorTestBase {
verifyAndSetRunningJobsToStatus(deployment, runningJobs);
assertFalse(isSnapshotInProgress.test(getJobStatus(deployment)));
assertNull(getSnapshotInfo.apply(deployment).getLastSnapshot());
-
assertNull(deployment.getStatus().getJobStatus().getUpgradeSnapshotReference());
+
assertNull(deployment.getStatus().getJobStatus().getUpgradeSavepointPath());
assertNull(getLastSnapshotStatus(deployment, snapshotType));
FlinkDeployment snDeployment = ReconciliationUtils.clone(deployment);
@@ -563,7 +562,7 @@ public class ApplicationReconcilerTest extends
OperatorTestBase {
reconciler.reconcile(snDeployment, context);
assertFalse(isSnapshotInProgress.test((getJobStatus(snDeployment))));
assertNull(getSnapshotInfo.apply(deployment).getLastSnapshot());
-
assertNull(deployment.getStatus().getJobStatus().getUpgradeSnapshotReference());
+
assertNull(deployment.getStatus().getJobStatus().getUpgradeSavepointPath());
assertNull(getLastSnapshotStatus(snDeployment, snapshotType));
// trigger when nonce is defined
@@ -1199,11 +1198,11 @@ public class ApplicationReconcilerTest extends
OperatorTestBase {
assertThat(snapshot.getSpec().getSavepoint().getPath())
.isEqualTo("savepoint_0");
assertEquals(
-
FlinkStateSnapshotReference.fromResource(snapshot),
+ "savepoint_0",
deployment
.getStatus()
.getJobStatus()
- .getUpgradeSnapshotReference());
+ .getUpgradeSavepointPath());
});
}
@@ -1376,7 +1375,7 @@ public class ApplicationReconcilerTest extends
OperatorTestBase {
status.getJobManagerDeploymentStatus());
// Verify that savepoint and upgrade mode is recorded correctly in
reconciled spec
- assertEquals(savepoint,
status.getJobStatus().getUpgradeSnapshotReference().getPath());
+ assertEquals(savepoint,
status.getJobStatus().getUpgradeSavepointPath());
assertEquals(
UpgradeMode.SAVEPOINT,
status.getReconciliationStatus()
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
index e4117659..465c3cf6 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
@@ -29,7 +29,6 @@ import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.api.CrdConstants;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
-import
org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
@@ -45,7 +44,6 @@ import
org.apache.flink.kubernetes.operator.exception.RecoveryFailureException;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.TestReconcilerAdapter;
import org.apache.flink.kubernetes.operator.service.CheckpointHistoryWrapper;
-import org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import io.fabric8.kubernetes.client.KubernetesClient;
@@ -178,38 +176,25 @@ public class ApplicationReconcilerUpgradeModeTest extends
OperatorTestBase {
snapshots.get(0).getMetadata().getLabels().get(CrdConstants.LABEL_SNAPSHOT_TYPE));
}
- private static Stream<Arguments> flinkVersionsWithLegacyMode() {
- return TestUtils.flinkVersions()
- .flatMap(
- arg ->
- Stream.of(
- Arguments.of(arg.get()[0], true),
- Arguments.of(arg.get()[0], false)));
- }
-
@ParameterizedTest
- @MethodSource("flinkVersionsWithLegacyMode")
- public void testUpgradeFromStatelessToLastState(
- FlinkVersion flinkVersion, boolean legacySnapshots) throws
Exception {
- testUpgradeToLastState(flinkVersion, UpgradeMode.STATELESS,
legacySnapshots);
+
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
+ public void testUpgradeFromStatelessToLastState(FlinkVersion flinkVersion)
throws Exception {
+ testUpgradeToLastState(flinkVersion, UpgradeMode.STATELESS);
}
@ParameterizedTest
- @MethodSource("flinkVersionsWithLegacyMode")
- public void testUpgradeFromSavepointToLastState(
- FlinkVersion flinkVersion, boolean legacySnapshots) throws
Exception {
- testUpgradeToLastState(flinkVersion, UpgradeMode.SAVEPOINT,
legacySnapshots);
+
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
+ public void testUpgradeFromSavepointToLastState(FlinkVersion flinkVersion)
throws Exception {
+ testUpgradeToLastState(flinkVersion, UpgradeMode.SAVEPOINT);
}
@ParameterizedTest
- @MethodSource("flinkVersionsWithLegacyMode")
- public void testUpgradeFromLastStateToLastState(
- FlinkVersion flinkVersion, boolean legacySnapshots) throws
Exception {
- testUpgradeToLastState(flinkVersion, UpgradeMode.LAST_STATE,
legacySnapshots);
+
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
+ public void testUpgradeFromLastStateToLastState(FlinkVersion flinkVersion)
throws Exception {
+ testUpgradeToLastState(flinkVersion, UpgradeMode.LAST_STATE);
}
- private void testUpgradeToLastState(
- FlinkVersion flinkVersion, UpgradeMode fromUpgradeMode, boolean
legacySnapshots)
+ private void testUpgradeToLastState(FlinkVersion flinkVersion, UpgradeMode
fromUpgradeMode)
throws Exception {
FlinkDeployment deployment = buildApplicationCluster(flinkVersion,
fromUpgradeMode);
@@ -250,27 +235,7 @@ public class ApplicationReconcilerUpgradeModeTest extends
OperatorTestBase {
deployment.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
deployment.getSpec().setRestartNonce(200L);
flinkService.setHaDataAvailable(false);
- if (legacySnapshots) {
- deployment
- .getStatus()
- .getJobStatus()
- .setUpgradeSnapshotReference(
-
FlinkStateSnapshotReference.fromPath("finished_sp"));
- } else {
- var snapshot =
- FlinkStateSnapshotUtils.createSavepointResource(
- kubernetesClient,
- deployment,
- "finished_sp",
- SnapshotTriggerType.UPGRADE,
- SavepointFormatType.CANONICAL,
- false);
- deployment
- .getStatus()
- .getJobStatus()
- .setUpgradeSnapshotReference(
-
FlinkStateSnapshotReference.fromResource(snapshot));
- }
+
deployment.getStatus().getJobStatus().setUpgradeSavepointPath("finished_sp");
deployment.getStatus().getJobStatus().setState("FINISHED");
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
deployment
@@ -308,11 +273,7 @@ public class ApplicationReconcilerUpgradeModeTest extends
OperatorTestBase {
SavepointFormatType.CANONICAL,
0L));
} else {
- deployment
- .getStatus()
- .getJobStatus()
- .setUpgradeSnapshotReference(
-
FlinkStateSnapshotReference.fromPath(savepointPath));
+
deployment.getStatus().getJobStatus().setUpgradeSavepointPath(savepointPath);
deployment
.getStatus()
.getJobStatus()
@@ -461,8 +422,7 @@ public class ApplicationReconcilerUpgradeModeTest extends
OperatorTestBase {
if (initSavepoint) {
assertEquals("init-sp", flinkService.listJobs().get(0).f0);
assertEquals(
- "init-sp",
-
deployment.getStatus().getJobStatus().getUpgradeSnapshotReference().getPath());
+ "init-sp",
deployment.getStatus().getJobStatus().getUpgradeSavepointPath());
assertEquals(UpgradeMode.SAVEPOINT,
lastReconciledSpec.getJob().getUpgradeMode());
} else {
assertNull(flinkService.listJobs().get(0).f0);
@@ -676,12 +636,12 @@ public class ApplicationReconcilerUpgradeModeTest extends
OperatorTestBase {
if (checkpointAvailable) {
assertEquals(
ApplicationReconciler.LAST_STATE_DUMMY_SP_PATH,
-
deployment.getStatus().getJobStatus().getUpgradeSnapshotReference().getPath());
+
deployment.getStatus().getJobStatus().getUpgradeSavepointPath());
assertEquals(
ApplicationReconciler.LAST_STATE_DUMMY_SP_PATH,
flinkService.listJobs().get(0).f0);
} else {
-
assertNull(deployment.getStatus().getJobStatus().getUpgradeSnapshotReference());
+
assertNull(deployment.getStatus().getJobStatus().getUpgradeSavepointPath());
assertNull(flinkService.listJobs().get(0).f0);
}
}
@@ -798,7 +758,7 @@ public class ApplicationReconcilerUpgradeModeTest extends
OperatorTestBase {
// trigger a savepoint
assertEquals(
ApplicationReconciler.LAST_STATE_DUMMY_SP_PATH,
-
deployment.getStatus().getJobStatus().getUpgradeSnapshotReference().getPath());
+
deployment.getStatus().getJobStatus().getUpgradeSavepointPath());
assertEquals(
ApplicationReconciler.LAST_STATE_DUMMY_SP_PATH,
flinkService.listJobs().get(0).f0);
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
index 88b4cf5b..6ab77705 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.api.CrdConstants;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
-import
org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
@@ -130,8 +129,7 @@ public class SessionJobReconcilerTest extends
OperatorTestBase {
if (legacySnapshots) {
assertEquals(
- "savepoint_0",
-
sessionJob.getStatus().getJobStatus().getUpgradeSnapshotReference().getPath());
+ "savepoint_0",
sessionJob.getStatus().getJobStatus().getUpgradeSavepointPath());
} else {
var snapshots =
TestUtils.getFlinkStateSnapshotsForResource(
@@ -139,8 +137,8 @@ public class SessionJobReconcilerTest extends
OperatorTestBase {
assertThat(snapshots).isNotEmpty();
assertEquals("savepoint_0",
snapshots.get(0).getSpec().getSavepoint().getPath());
assertEquals(
- FlinkStateSnapshotReference.fromResource(snapshots.get(0)),
-
sessionJob.getStatus().getJobStatus().getUpgradeSnapshotReference());
+ snapshots.get(0).getSpec().getSavepoint().getPath(),
+
sessionJob.getStatus().getJobStatus().getUpgradeSavepointPath());
}
}
@@ -173,8 +171,7 @@ public class SessionJobReconcilerTest extends
OperatorTestBase {
sessionJob,
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
if (legacySnapshots) {
assertEquals(
- "savepoint_0",
-
sessionJob.getStatus().getJobStatus().getUpgradeSnapshotReference().getPath());
+ "savepoint_0",
sessionJob.getStatus().getJobStatus().getUpgradeSavepointPath());
} else {
var snapshots =
TestUtils.getFlinkStateSnapshotsForResource(
@@ -182,8 +179,7 @@ public class SessionJobReconcilerTest extends
OperatorTestBase {
assertThat(snapshots).isNotEmpty();
assertEquals("savepoint_0",
snapshots.get(0).getSpec().getSavepoint().getPath());
assertEquals(
- FlinkStateSnapshotReference.fromResource(snapshots.get(0)),
-
sessionJob.getStatus().getJobStatus().getUpgradeSnapshotReference());
+ "savepoint_0",
sessionJob.getStatus().getJobStatus().getUpgradeSavepointPath());
}
}
@@ -412,11 +408,7 @@ public class SessionJobReconcilerTest extends
OperatorTestBase {
if (legacySnapshots) {
assertEquals(
"savepoint_0",
- statefulSessionJob
- .getStatus()
- .getJobStatus()
- .getUpgradeSnapshotReference()
- .getPath());
+
statefulSessionJob.getStatus().getJobStatus().getUpgradeSavepointPath());
} else {
var snapshots =
TestUtils.getFlinkStateSnapshotsForResource(
@@ -430,8 +422,8 @@ public class SessionJobReconcilerTest extends
OperatorTestBase {
.getLabels()
.get(CrdConstants.LABEL_SNAPSHOT_TYPE));
assertEquals(
- FlinkStateSnapshotReference.fromResource(snapshots.get(0)),
-
statefulSessionJob.getStatus().getJobStatus().getUpgradeSnapshotReference());
+ snapshots.get(0).getSpec().getSavepoint().getPath(),
+
statefulSessionJob.getStatus().getJobStatus().getUpgradeSavepointPath());
}
flinkService.clear();
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
index f9b4781c..3cfa562b 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
@@ -302,7 +302,7 @@ public class AbstractFlinkServiceTest {
assertTrue(cancelFuture.isDone());
assertEquals(jobID, cancelFuture.get());
assertNull(jobStatus.getSavepointInfo().getLastSavepoint());
- assertNull(jobStatus.getUpgradeSnapshotReference());
+ assertNull(jobStatus.getUpgradeSavepointPath());
}
@ParameterizedTest
@@ -541,7 +541,7 @@ public class AbstractFlinkServiceTest {
configManager.getObserveConfig(deployment),
false);
assertNull(jobStatus.getSavepointInfo().getLastSavepoint());
- assertNull(jobStatus.getUpgradeSnapshotReference());
+ assertNull(jobStatus.getUpgradeSavepointPath());
}
@Test
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java
index fd136969..1a3dc8e6 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.api.CrdConstants;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
-import
org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.JobKind;
import org.apache.flink.kubernetes.operator.api.spec.JobReference;
@@ -31,7 +30,6 @@ import
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus;
import
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.SavepointFormatType;
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
-import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import io.fabric8.kubernetes.client.KubernetesClient;
@@ -47,14 +45,10 @@ import java.util.stream.Collectors;
import static org.apache.flink.kubernetes.operator.TestUtils.reconcileSpec;
import static
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.COMPLETED;
import static
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.IN_PROGRESS;
-import static
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.TRIGGER_PENDING;
import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_JOB_SAVEPOINT_DISPOSE_ON_DELETE;
-import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.SNAPSHOT_RESOURCE_ENABLED;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
/** Tests for {@link FlinkStateSnapshotUtils}. */
@@ -62,8 +56,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class FlinkStateSnapshotUtilsTest {
private KubernetesClient client;
-
- private final FlinkConfigManager configManager = new
FlinkConfigManager(new Configuration());
private static final String NAMESPACE = "test";
private static final String SAVEPOINT_NAME = "savepoint-01";
private static final String SAVEPOINT_PATH = "/tmp/savepoint-01";
@@ -98,87 +90,6 @@ public class FlinkStateSnapshotUtilsTest {
.isEqualTo(SnapshotTriggerType.PERIODIC);
}
- @Test
- public void testGetValidatedFlinkStateSnapshotPathPathGiven() {
- var snapshotRef =
FlinkStateSnapshotReference.builder().path(SAVEPOINT_PATH).build();
- var snapshotResult =
-
FlinkStateSnapshotUtils.getValidatedFlinkStateSnapshotPath(client, snapshotRef);
- assertEquals(SAVEPOINT_PATH, snapshotResult);
- }
-
- @Test
- public void testGetValidatedFlinkStateSnapshotPathFoundResource() {
- var snapshot = initSavepoint(COMPLETED, null);
- client.resource(snapshot).create();
-
- var snapshotRef =
- FlinkStateSnapshotReference.builder()
- .namespace(NAMESPACE)
- .name(SAVEPOINT_NAME)
- .build();
- var snapshotResult =
-
FlinkStateSnapshotUtils.getValidatedFlinkStateSnapshotPath(client, snapshotRef);
- assertEquals(SAVEPOINT_PATH, snapshotResult);
- }
-
- @Test
- public void testGetValidatedFlinkStateSnapshotPathInvalidName() {
- var snapshotRef =
-
FlinkStateSnapshotReference.builder().namespace(NAMESPACE).name(" ").build();
- assertThrows(
- IllegalArgumentException.class,
- () ->
-
FlinkStateSnapshotUtils.getValidatedFlinkStateSnapshotPath(
- client, snapshotRef));
- }
-
- @Test
- public void testGetValidatedFlinkStateSnapshotPathNotFound() {
- var snapshotRef =
- FlinkStateSnapshotReference.builder()
- .namespace("not-exists")
- .name("not-exists")
- .build();
- assertThrows(
- IllegalStateException.class,
- () ->
-
FlinkStateSnapshotUtils.getValidatedFlinkStateSnapshotPath(
- client, snapshotRef));
- }
-
- @Test
- public void testGetAndValidateFlinkStateSnapshotAlreadyExists() {
- var snapshot = initSavepoint(TRIGGER_PENDING, null);
- snapshot.getSpec().getSavepoint().setAlreadyExists(true);
- client.resource(snapshot).create();
-
- var snapshotRef =
- FlinkStateSnapshotReference.builder()
- .namespace(NAMESPACE)
- .name(SAVEPOINT_NAME)
- .build();
- var snapshotResult =
-
FlinkStateSnapshotUtils.getValidatedFlinkStateSnapshotPath(client, snapshotRef);
- assertEquals(SAVEPOINT_PATH, snapshotResult);
- }
-
- @Test
- public void testGetValidatedFlinkStateSnapshotPathNotCompleted() {
- var snapshot = initSavepoint(IN_PROGRESS, null);
- client.resource(snapshot).create();
-
- var snapshotRef =
- FlinkStateSnapshotReference.builder()
- .namespace(NAMESPACE)
- .name(SAVEPOINT_NAME)
- .build();
- assertThrows(
- IllegalStateException.class,
- () ->
-
FlinkStateSnapshotUtils.getValidatedFlinkStateSnapshotPath(
- client, snapshotRef));
- }
-
@Test
public void testGetFlinkStateSnapshotsForResource() {
var deployment = initDeployment();
@@ -298,45 +209,25 @@ public class FlinkStateSnapshotUtilsTest {
var conf = new Configuration();
conf.set(OPERATOR_JOB_SAVEPOINT_DISPOSE_ON_DELETE, disposeOnDelete);
var operatorConf = FlinkOperatorConfiguration.fromConfiguration(conf);
- var result =
- FlinkStateSnapshotUtils.createReferenceForUpgradeSavepoint(
- conf,
- operatorConf,
- client,
- deployment,
- SavepointFormatType.CANONICAL,
- SAVEPOINT_PATH);
+ FlinkStateSnapshotUtils.createUpgradeSnapshotResource(
+ conf,
+ operatorConf,
+ client,
+ deployment,
+ SavepointFormatType.CANONICAL,
+ SAVEPOINT_PATH);
var snapshots = TestUtils.getFlinkStateSnapshotsForResource(client,
deployment);
assertThat(snapshots)
.hasSize(1)
.allSatisfy(
snapshot -> {
- assertEquals(snapshot.getMetadata().getName(),
result.getName());
- assertEquals(
- snapshot.getMetadata().getNamespace(),
result.getNamespace());
assertEquals(
disposeOnDelete,
snapshot.getSpec().getSavepoint().getDisposeOnDelete());
+ assertEquals(
+ SAVEPOINT_PATH,
snapshot.getSpec().getSavepoint().getPath());
+
assertTrue(snapshot.getSpec().getSavepoint().getAlreadyExists());
});
- assertNull(result.getPath());
- }
-
- @Test
- public void testCreateReferenceForUpgradeSavepointWithPath() {
- var deployment = initDeployment();
- var conf = new Configuration().set(SNAPSHOT_RESOURCE_ENABLED, false);
- var operatorConf = FlinkOperatorConfiguration.fromConfiguration(conf);
- var result =
- FlinkStateSnapshotUtils.createReferenceForUpgradeSavepoint(
- conf,
- operatorConf,
- client,
- deployment,
- SavepointFormatType.CANONICAL,
- SAVEPOINT_PATH);
- assertEquals(SAVEPOINT_PATH, result.getPath());
- assertNull(result.getNamespace());
- assertNull(result.getName());
}
@Test
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.java
index 23720df7..37f05309 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
-import
org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
@@ -319,20 +318,12 @@ public class SnapshotUtilsTest {
sp.setLocation(AbstractJobReconciler.LAST_STATE_DUMMY_SP_PATH);
assertFalse(SnapshotUtils.lastSavepointKnown(status));
- status.getJobStatus()
-
.setUpgradeSnapshotReference(FlinkStateSnapshotReference.fromPath("sp1"));
+ status.getJobStatus().setUpgradeSavepointPath("sp1");
assertTrue(SnapshotUtils.lastSavepointKnown(status));
status.getJobStatus()
- .setUpgradeSnapshotReference(
- FlinkStateSnapshotReference.fromPath(
-
AbstractJobReconciler.LAST_STATE_DUMMY_SP_PATH));
+
.setUpgradeSavepointPath(AbstractJobReconciler.LAST_STATE_DUMMY_SP_PATH);
assertFalse(SnapshotUtils.lastSavepointKnown(status));
-
- status.getJobStatus()
- .setUpgradeSnapshotReference(
- new FlinkStateSnapshotReference("namespace", "name",
null));
- assertTrue(SnapshotUtils.lastSavepointKnown(status));
}
private static void resetTrigger(FlinkDeployment deployment, SnapshotType
snapshotType) {
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
index 69ce55ed..f5d0712f 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
@@ -31,7 +31,6 @@ import
org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
import org.apache.flink.kubernetes.operator.api.spec.CheckpointSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
-import
org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.IngressSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobKind;
@@ -527,16 +526,6 @@ public class DefaultValidatorTest {
dep.getSpec().getTaskManager().getResource().setEphemeralStorage("abc");
},
"TaskManager resource ephemeral storage parse error: Character
a is neither a decimal digit number, decimal point, nor \"e\" notation
exponential mark.");
-
- testError(
- dep -> {
- dep.getSpec()
- .getJob()
- .setFlinkStateSnapshotReference(
-
FlinkStateSnapshotReference.builder().name("snapshot").build());
- dep.getSpec().getJob().setInitialSavepointPath("s0");
- },
- "Cannot set both initialSavepointPath and
flinkStateSnapshotReference in the job spec");
}
@Test
@@ -615,9 +604,8 @@ public class DefaultValidatorTest {
.serializeAndSetLastReconciledSpec(dep.getSpec(),
dep);
job.setSavepointRedeployNonce(1L);
job.setInitialSavepointPath(null);
- job.setFlinkStateSnapshotReference(null);
},
- "InitialSavepointPath and flinkStateSnapshotReference must not
be empty for savepoint redeployment");
+ "InitialSavepointPath must not be empty for savepoint
redeployment");
testError(
dep -> {
@@ -628,7 +616,7 @@ public class DefaultValidatorTest {
job.setSavepointRedeployNonce(1L);
job.setInitialSavepointPath(" ");
},
- "InitialSavepointPath and flinkStateSnapshotReference must not
be empty for savepoint redeploymen");
+ "InitialSavepointPath must not be empty for savepoint
redeploymen");
testError(
dep -> {
@@ -642,7 +630,7 @@ public class DefaultValidatorTest {
job.setSavepointRedeployNonce(2L);
job.setInitialSavepointPath(null);
},
- "InitialSavepointPath and flinkStateSnapshotReference must not
be empty for savepoint redeploymen");
+ "InitialSavepointPath must not be empty for savepoint
redeploymen");
}
@ParameterizedTest
@@ -1072,19 +1060,17 @@ public class DefaultValidatorTest {
null);
var refName = "does-not-exist";
- var namespace = "default";
var snapshot =
TestUtils.buildFlinkStateSnapshotSavepoint(
false,
JobReference.builder()
.kind(JobKind.FLINK_DEPLOYMENT)
.name(refName)
- .namespace(namespace)
.build());
testStateSnapshotValidate(
snapshot,
Optional.empty(),
- String.format("Target for snapshot %s/%s was not found",
namespace, refName));
+ String.format("Target for snapshot test/%s was not found",
refName));
}
private void testStateSnapshotValidateWithModifier(
diff --git
a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
index 6aabf726..b4c2d4c0 100644
---
a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
+++
b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
@@ -88,15 +88,6 @@ spec:
type: integer
entryClass:
type: string
- flinkStateSnapshotReference:
- properties:
- name:
- type: string
- namespace:
- type: string
- path:
- type: string
- type: object
initialSavepointPath:
type: string
jarURI:
@@ -10367,15 +10358,8 @@ spec:
type: string
updateTime:
type: string
- upgradeSnapshotReference:
- properties:
- name:
- type: string
- namespace:
- type: string
- path:
- type: string
- type: object
+ upgradeSavepointPath:
+ type: string
type: object
lifecycleState:
enum:
diff --git
a/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
b/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
index 0df1aa87..b7ea567d 100644
---
a/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
+++
b/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
@@ -49,15 +49,6 @@ spec:
type: integer
entryClass:
type: string
- flinkStateSnapshotReference:
- properties:
- name:
- type: string
- namespace:
- type: string
- path:
- type: string
- type: object
initialSavepointPath:
type: string
jarURI:
@@ -209,15 +200,8 @@ spec:
type: string
updateTime:
type: string
- upgradeSnapshotReference:
- properties:
- name:
- type: string
- namespace:
- type: string
- path:
- type: string
- type: object
+ upgradeSavepointPath:
+ type: string
type: object
lifecycleState:
enum:
diff --git
a/helm/flink-kubernetes-operator/crds/flinkstatesnapshots.flink.apache.org-v1.yml
b/helm/flink-kubernetes-operator/crds/flinkstatesnapshots.flink.apache.org-v1.yml
index 96ba2ca1..3f2f3475 100644
---
a/helm/flink-kubernetes-operator/crds/flinkstatesnapshots.flink.apache.org-v1.yml
+++
b/helm/flink-kubernetes-operator/crds/flinkstatesnapshots.flink.apache.org-v1.yml
@@ -48,8 +48,6 @@ spec:
type: string
name:
type: string
- namespace:
- type: string
type: object
savepoint:
properties: