This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 8479f226 [FLINK-35267][snapshot] FlinkStateSnapshot documentation and 
examples
8479f226 is described below

commit 8479f2267620cf6cb7a5cda981a769b8a0262184
Author: Mate Czagany <[email protected]>
AuthorDate: Mon Jul 15 15:20:17 2024 +0200

    [FLINK-35267][snapshot] FlinkStateSnapshot documentation and examples
---
 docs/content/docs/concepts/controller-flow.md      |   2 +-
 docs/content/docs/concepts/overview.md             |   2 +
 .../content/docs/custom-resource/job-management.md |  59 +------
 docs/content/docs/custom-resource/overview.md      |   4 +
 docs/content/docs/custom-resource/snapshots.md     | 195 +++++++++++++++++++++
 docs/content/docs/operations/upgrade.md            |   7 +-
 examples/snapshot/checkpoint.yaml                  |  28 +++
 examples/snapshot/job-from-savepoint.yaml          |  69 ++++++++
 examples/snapshot/savepoint-already-exists.yaml    |  27 +++
 examples/snapshot/savepoint-job-defaults.yaml      |  28 +++
 examples/snapshot/savepoint-job-full-spec.yaml     |  32 ++++
 11 files changed, 395 insertions(+), 58 deletions(-)

diff --git a/docs/content/docs/concepts/controller-flow.md 
b/docs/content/docs/concepts/controller-flow.md
index b6d876ba..8e8d84c0 100644
--- a/docs/content/docs/concepts/controller-flow.md
+++ b/docs/content/docs/concepts/controller-flow.md
@@ -98,7 +98,7 @@ It’s very important to understand that the Observer phase 
records a point-in-t
 The `AbstractFlinkResourceReconciler` encapsulates the core reconciliation 
flow for all Flink resource types. Let’s take a look at the high level flow 
before we go into specifics for session, application and session job resources.
 
 1. Check if the resource is ready for reconciliation or if there are any 
pending operations that should not be interrupted (manual savepoints for 
example)
-2. If this is the first deployment attempt for the resource, we simply deploy 
it. It’s important to note here that this is the only deploy operation where we 
use the `initialSavepointPath` provided in the spec.
+2. If this is the first deployment attempt for the resource, we simply deploy 
it. It’s important to note here that this is the only deploy operation where we 
use the `flinkStateSnapshotReference` provided in the spec.
 3. Next we determine if the desired spec changed and the type of change: 
`IGNORE, SCALE, UPGRADE`. Only for scale and upgrade type changes do we need to 
execute further reconciliation logic.
 4. If we have upgrade/scale spec changes we execute the upgrade logic specific 
for the resource type
 5. If we did not receive any spec change we still have to ensure that the 
currently deployed resources are fully reconciled:
diff --git a/docs/content/docs/concepts/overview.md 
b/docs/content/docs/concepts/overview.md
index 2544f601..d1b1a9c8 100644
--- a/docs/content/docs/concepts/overview.md
+++ b/docs/content/docs/concepts/overview.md
@@ -56,6 +56,8 @@ Flink Kubernetes Operator aims to capture the 
responsibilities of a human operat
   - Collect lag and utilization metrics
   - Scale job vertices to the ideal parallelism
   - Scale up and down as the load changes
+- [Snapshot management]({{< ref "docs/custom-resource/snapshots" >}})
+  - Manage snapshots via Kubernetes CRs
 ### Operations
 - Operator [Metrics]({{< ref "docs/operations/metrics-logging#metrics" >}})
   - Utilizes the well-established [Flink Metric 
System](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics)
diff --git a/docs/content/docs/custom-resource/job-management.md 
b/docs/content/docs/custom-resource/job-management.md
index 74d3b026..790c825a 100644
--- a/docs/content/docs/custom-resource/job-management.md
+++ b/docs/content/docs/custom-resource/job-management.md
@@ -167,56 +167,6 @@ For this purpose you can use the `restartNonce` top level 
field in the spec. Set
 
 Restarts work exactly the same way as other application upgrades and follow 
the semantics detailed in the previous section.
 
-## Savepoint management
-
-Savepoints are triggered automatically by the system during the upgrade 
process as we have seen in the previous sections.
-
-For backup, job forking and other purposes savepoints can be triggered 
manually or periodically by the operator, however generally speaking these will 
not be used during upgrades and are not required for the correct operation.
-
-### Manual Savepoint Triggering
-
-Users can trigger savepoints manually by defining a new (different/random) 
value to the variable `savepointTriggerNonce` in the job specification:
-
-```yaml
- job:
-    ...
-    savepointTriggerNonce: 123
-```
-
-Changing the nonce value will trigger a new savepoint. Information about 
pending and last savepoint is stored in the resource status.
-
-### Periodic Savepoint Triggering
-
-The operator also supports periodic savepoint triggering through the following 
config option which can be configured on a per job level:
-
-```yaml
- flinkConfiguration:
-    ...
-    kubernetes.operator.periodic.savepoint.interval: 6h
-```
-
-There is no guarantee on the timely execution of the periodic savepoints as 
they might be delayed by unhealthy job status or other interfering user 
operation.
-
-### Savepoint History
-
-The operator automatically keeps track of the savepoint history triggered by 
upgrade or manual savepoint operations.
-This is necessary so cleanup can be performed by the operator for old 
savepoints.
-
-Users can control the cleanup behaviour by specifying a maximum age and 
maximum count for the savepoints in the history.
-
-```
-kubernetes.operator.savepoint.history.max.age: 24 h
-kubernetes.operator.savepoint.history.max.count: 5
-```
-
-{{< hint info >}}
-Savepoint cleanup happens lazily and only when the application is running.
-It is therefore very likely that savepoints live beyond the max age 
configuration.  
-{{< /hint >}}
-
-To disable savepoint cleanup by the operator you can set 
`kubernetes.operator.savepoint.cleanup.enabled: false`.
-When savepoint cleanup is disabled the operator will still collect and 
populate the savepoint history but not perform any dispose operations.
-
 ## Recovery of missing job deployments
 
 When HA is enabled, the operator can recover the Flink cluster deployments in 
cases when it was accidentally deleted
@@ -297,16 +247,17 @@ Users have two options to restore a job from a target 
savepoint / checkpoint
 
 ### Redeploy using the savepointRedeployNonce
 
-It is possible to redeploy a `FlinkDeployment` or `FlinkSessionJob` resource 
from a target savepoint by using the combination of `savepointRedeployNonce` 
and `initialSavepointPath` in the job spec:
+It is possible to redeploy a `FlinkDeployment` or `FlinkSessionJob` resource 
from a target savepoint by using the combination of `savepointRedeployNonce` 
and `flinkStateSnapshotReference` in the job spec:
 
 ```yaml
  job:
-   initialSavepointPath: file://redeploy-target-savepoint
+   flinkStateSnapshotReference:
+     path: file://redeploy-target-savepoint
    # If not set previously, set to 1, otherwise increment, e.g. 2
    savepointRedeployNonce: 1
 ```
 
-When changing the `savepointRedeployNonce` the operator will redeploy the job 
to the savepoint defined in the `initialSavepointPath`. The savepoint path must 
not be empty. 
+When changing the `savepointRedeployNonce` the operator will redeploy the job 
to the savepoint defined in the `flinkStateSnapshotReference`. The savepoint 
path must not be empty. 
 
 {{< hint warning >}}
 Rollbacks are not supported after redeployments.
@@ -320,7 +271,7 @@ However, this also means that savepoint history is lost and 
the operator won't c
  1. Locate the latest checkpoint/savepoint metafile in your configured 
checkpoint/savepoint directory.
  2. Delete the `FlinkDeployment` resource for your application
  3. Check that you have the current savepoint, and that your `FlinkDeployment` 
is deleted completely
- 4. Modify your `FlinkDeployment` JobSpec and set the `initialSavepointPath` 
to your last checkpoint location
+ 4. Modify your `FlinkDeployment` JobSpec and set 
`flinkStateSnapshotReference.path` to your last checkpoint location
  5. Recreate the deployment
 
 These steps ensure that the operator will start completely fresh from the user 
defined savepoint path and can hopefully fully recover.
diff --git a/docs/content/docs/custom-resource/overview.md 
b/docs/content/docs/custom-resource/overview.md
index 3c7cc943..6c3aadc5 100644
--- a/docs/content/docs/custom-resource/overview.md
+++ b/docs/content/docs/custom-resource/overview.md
@@ -37,6 +37,9 @@ With these two Custom Resources, we can support two different 
operational models
 - Flink application managed by the `FlinkDeployment`
 - Empty Flink session managed by the `FlinkDeployment` + multiple jobs managed 
by the `FlinkSessionJobs`. The operations on the session jobs are independent 
of each other.
 
+To help managing snapshots, there is another CR called FlinkStateSnapshot. 
This can be created by the operator in case of periodic and upgrade 
savepoints/checkpoints, or manually by the user to trigger a 
savepoint/checkpoint for a job.
+FlinkStateSnapshots will always have a FlinkDeployment or FlinkSessionJob 
linked to them in their spec.
+
 ## FlinkDeployment
 
 FlinkDeployment objects are defined in YAML format by the user and must 
contain the following required fields:
@@ -218,6 +221,7 @@ Alternatively, if you use helm to install 
flink-kubernetes-operator, it allows y
 
 ## Further information
 
+ - [Snapshots]({{< ref "docs/custom-resource/snapshots" >}})
  - [Job Management and Stateful upgrades]({{< ref 
"docs/custom-resource/job-management" >}})
  - [Deployment customization and pod templates]({{< ref 
"docs/custom-resource/pod-template" >}})
  - [Full Reference]({{< ref "docs/custom-resource/reference" >}})
diff --git a/docs/content/docs/custom-resource/snapshots.md 
b/docs/content/docs/custom-resource/snapshots.md
new file mode 100644
index 00000000..6c874c67
--- /dev/null
+++ b/docs/content/docs/custom-resource/snapshots.md
@@ -0,0 +1,195 @@
+---
+title: "Snapshots"
+weight: 3
+type: docs
+aliases:
+  - /custom-resource/snapshots.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Snapshots
+
+To create, list and delete snapshots you can use the custom resource called 
FlinkStateSnapshot. 
+The operator will use the same controller flow as in the case of 
FlinkDeployment and FlinkSessionJob to trigger the savepoint/checkpoint and 
observe its status.
+
+This feature deprecates the old `savepointInfo` and `checkpointInfo` fields 
found in the Flink resource CR status, alongside with spec fields 
`initialSavepointPath`, `savepointTriggerNonce` and `checkpointTriggerNonce`. 
+It is enabled by default using the configuration option 
`kubernetes.operator.snapshot.resource.enabled`. 
+If you set this to false, the operator will keep using the deprecated status 
fields to track snapshots.
+
+## Overview
+
+To create a savepoint or checkpoint, exactly one of the spec fields 
`savepoint` or `checkpoint` must present. 
+Furthermore, in case of a savepoint you can signal to the operator that the 
savepoint already exists using the `alreadyExists` field, and the operator will 
mark it as a successful snapshot in the next reconciliation phase.
+
+You can also instruct the Operator to start a new 
FlinkDeployment/FlinkSessionJob from an existing snapshot CR by using 
`flinkStateSnapshotReference` in the job spec.
+
+## Examples
+
+### Savepoint
+
+```yaml
+apiVersion: flink.apache.org/v1beta1
+kind: FlinkStateSnapshot
+metadata:
+  name: example-savepoint
+spec:
+  backoffLimit: 1  # retry count, -1 for infinite, 0 for no retries (default: 
-1)
+  jobReference:
+    kind: FlinkDeployment  # FlinkDeployment or FlinkSessionJob
+    name: example-deployment  # name of the resource
+  savepoint:
+    alreadyExists: false  # optional (default: false), if true, the path is 
considered to already exist and state will be COMPLETED on first reconciliation
+    disposeOnDelete: true  # optional (default: true), dispose of savepoint 
when this FlinkStateSnapshot is removed, job needs to be running
+    formatType: CANONICAL  # optional (default: CANONICAL), format type of 
savepoint
+    path: /flink-data/savepoints-custom  # optional (default: job savepoint 
path)
+```
+
+### Checkpoint
+
+```yaml
+apiVersion: flink.apache.org/v1beta1
+kind: FlinkStateSnapshot
+metadata:
+  name: example-checkpoint
+spec:
+  backoffLimit: 1
+  jobReference:
+    kind: FlinkDeployment
+    name: example-deployment
+  checkpoint: {}
+```
+
+### Start job from existing snapshot
+
+```yaml
+ job:
+   flinkStateSnapshotReference:
+     namespace: flink  # not required if it's in the same namespace
+     name: example-savepoint
+```
+
+{{< hint warning >}}
+While it is possible to start a job from a FlinkStateSnapshot with checkpoint 
type, checkpoint data is owned by Flink, and might be deleted by Flink anytime 
after triggering the checkpoint.
+{{< /hint >}}
+
+
+## Snapshot CR lifecycle
+
+### Snapshot creation
+
+When a new FlinkStateSnapshot CR is created, in the first reconciliation phase 
the operator will trigger the savepoint/checkpoint for the linked deployment 
via REST API. 
+The resulting trigger ID will be added to the CR Status.
+
+In the next observation phase the operator will check all the in-progress 
snapshots and query their state. 
+If the snapshot was successful, the path will be added to the CR Status.
+
+If the triggered snapshot is a savepoint and `spec.savepoint.alreadyExists` is 
set to true, on the first reconciliation the operator will populate its 
`status` fields with `COMPLETED` state, and copy the savepoint path found in 
the spec to `status.path`.
+
+### Snapshot errors
+
+If the operator encountered any errors during snapshot 
observation/reconciliation, the `error` field will be populated in the CR 
status and the `failures` field will be incremented by 1. 
+If the backoff limit specified in the spec is reached, the snapshot will enter 
a `FAILED` state, and won't be retried. 
+If it's not reached, the Operator will continuously back off retrying the 
snapshot (10s, 20s, 40s, ...).
+
+In case of any error there will also be a new Event generated for the snapshot 
resource containing the error message.
+
+{{< hint info >}}
+For checkpoints, after the operator has ensured that the checkpoint was 
successful, it will attempt to fetch its final path via Flink REST API. 
+Any errors experienced during this step will generate a Kubernetes event, but 
will not populate the `error` field, and will mark the checkpoint as 
`COMPLETED`.
+The `path` field will stay empty though.
+{{< /hint >}}
+
+### Snapshot abandonment
+
+If the referenced Flink job can't be found or is stopped after triggering a 
snapshot, the state of the snapshot will be `ABANDONED` and won't be retried.
+
+### Savepoint disposal on deletion
+
+In case of savepoints, if `spec.savepoint.disposeOnDelete` is true, the 
operator will automatically dispose the savepoint on the filesystem when the CR 
gets deleted. 
+This however requires the referenced Flink resource to be alive, as this 
operation is done using Flink REST API.
+
+This feature is not available for checkpoints.
+
+
+## Triggering snapshots
+
+Upgrade savepoints are triggered automatically by the system during the 
upgrade process as we have seen in the previous sections.
+In this case, the savepoint path will also be recorded in the 
`upgradeSnapshotReference` job status field, which the operator will use when 
restarting the job.
+
+For backup, job forking and other purposes savepoint and checkpoints can be 
triggered manually or periodically by the operator, however generally speaking 
these will not be used during upgrades and are not required for the correct 
operation.
+
+### Manual Checkpoint Triggering
+
+Users can trigger snapshots manually by defining a new (different/random) 
value to the variable `savepointTriggerNonce` or `checkpointTriggerNonce` in 
the job specification:
+
+```yaml
+ job:
+    ...
+    savepointTriggerNonce: 123
+    checkpointTriggerNonce: 123
+    ...
+```
+
+Changing the nonce value will trigger a new snapshot. If FlinkStateSnapshot 
resources are enabled, a new snapshot CR will be automatically created.
+If disabled, information about pending and last snapshots is stored in the 
FlinkDeployment/FlinkSessionJob CR status.
+
+### Periodic Snapshot Triggering
+
+The operator also supports periodic snapshot triggering through the following 
config option which can be configured on a per job level:
+
+```yaml
+ flinkConfiguration:
+    ...
+    kubernetes.operator.periodic.savepoint.interval: 6h
+    kubernetes.operator.periodic.checkpoint.interval: 6h
+```
+
+There is no guarantee on the timely execution of the periodic snapshots as 
they might be delayed by unhealthy job status or other interfering user 
operation.
+
+### Snapshot History
+
+The operator automatically keeps track of the snapshot history triggered by 
upgrade, manual and periodic snapshot operations.
+This is necessary so cleanup can be performed by the operator for old 
snapshots.
+
+Users can control the cleanup behaviour by specifying a maximum age and 
maximum count for the savepoint and checkpoint resources in the history.
+
+```
+kubernetes.operator.savepoint.history.max.age: 24 h
+kubernetes.operator.savepoint.history.max.count: 5
+
+kubernetes.operator.checkpoint.history.max.age: 24 h
+kubernetes.operator.checkpoint.history.max.count: 5
+```
+
+{{< hint warning >}}
+Checkpoint history history cleanup is only supported if FlinkStateSnapshot 
resources are enabled.
+This operation will only delete the FlinkStateSnapshot CR, and will never 
delete any checkpoint data on the filesystem.
+{{< /hint >}}
+
+{{< hint info >}}
+Savepoint cleanup happens lazily and only when the Flink resource associated 
with the snapshot is running.
+It is therefore very likely that savepoints live beyond the max age 
configuration.  
+{{< /hint >}}
+
+To also dispose of savepoint data on savepoint cleanup, set 
`kubernetes.operator.savepoint.dispose-on-delete: true`. 
+This config will set `spec.savepoint.disposeOnDelete` to true for 
FlinkStateSnapshot CRs created by periodic savepoints and manual ones created 
using `savepointTriggerNonce`.
+
+To disable savepoint/checkpoint cleanup by the operator you can set 
`kubernetes.operator.savepoint.cleanup.enabled: false` and 
`kubernetes.operator.checkpoint.cleanup.enabled: false`.
+
diff --git a/docs/content/docs/operations/upgrade.md 
b/docs/content/docs/operations/upgrade.md
index e9b7ec92..65f188b4 100644
--- a/docs/content/docs/operations/upgrade.md
+++ b/docs/content/docs/operations/upgrade.md
@@ -148,19 +148,20 @@ Here is a reference example of upgrading a 
`basic-checkpoint-ha-example` deploym
     ```
 5. Restore the job:
 
-   Deploy the previously deleted job using this 
[FlinkDeployemnt](https://raw.githubusercontent.com/apache/flink-kubernetes-operator/main/examples/basic-checkpoint-ha.yaml)
 with `v1beta1` and explicitly set the `job.initialSavepointPath` to the 
savepoint location obtained from the step 1.
+   Deploy the previously deleted job using this 
[FlinkDeployemnt](https://raw.githubusercontent.com/apache/flink-kubernetes-operator/main/examples/basic-checkpoint-ha.yaml)
 with `v1beta1` and explicitly set the `job.flinkStateSnapshotReference.path` 
to the savepoint location obtained from the step 1.
 
     ```
     spec:
       ...
       job:
-        initialSavepointPath: 
/flink-data/savepoints/savepoint-000000-aec3dd08e76d/_metadata
+        flinkStateSnapshotReference: 
+          path: /flink-data/savepoints/savepoint-000000-aec3dd08e76d/_metadata
         upgradeMode: savepoint
       ...
     ```
     Alternatively, we may use this command to edit and deploy the manifest:
     ```sh
-    wget -qO - 
https://raw.githubusercontent.com/apache/flink-kubernetes-operator/main/examples/basic-checkpoint-ha.yaml|
 yq w - "spec.job.initialSavepointPath" 
"/flink-data/savepoints/savepoint-000000-aec3dd08e76d/_metadata"| kubectl apply 
-f -
+    wget -qO - 
https://raw.githubusercontent.com/apache/flink-kubernetes-operator/main/examples/basic-checkpoint-ha.yaml|
 yq w - "spec.job.flinkStateSnapshotReference.path" 
"/flink-data/savepoints/savepoint-000000-aec3dd08e76d/_metadata"| kubectl apply 
-f -
     ```
    Finally, verify that `deploy/basic-checkpoint-ha-example` log has:
     ```
diff --git a/examples/snapshot/checkpoint.yaml 
b/examples/snapshot/checkpoint.yaml
new file mode 100644
index 00000000..0e3a56d8
--- /dev/null
+++ b/examples/snapshot/checkpoint.yaml
@@ -0,0 +1,28 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+apiVersion: flink.apache.org/v1beta1
+kind: FlinkStateSnapshot
+metadata:
+  name: example-checkpoint
+spec:
+  backoffLimit: 0
+  jobReference:
+    kind: FlinkDeployment
+    name: example-deployment
+  checkpoint: {} # This will specify that we want a checkpoint
diff --git a/examples/snapshot/job-from-savepoint.yaml 
b/examples/snapshot/job-from-savepoint.yaml
new file mode 100644
index 00000000..698ad220
--- /dev/null
+++ b/examples/snapshot/job-from-savepoint.yaml
@@ -0,0 +1,69 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Create a FlinkStateSnapshot with an already existing savepoint on the FS, 
and starts a new job with that snapshot.
+
+apiVersion: flink.apache.org/v1beta1
+kind: FlinkStateSnapshot
+metadata:
+  name: example-savepoint
+spec:
+  savepoint:
+    alreadyExists: true
+    disposeOnDelete: false
+    path: file:///flink-data/savepoints/savepoint-45305c-d867504446e0
+
+---
+apiVersion: flink.apache.org/v1beta1
+kind: FlinkDeployment
+metadata:
+  name: example-deployment
+spec:
+  image: flink:1.19
+  flinkVersion: v1_19
+  flinkConfiguration:
+    state.checkpoints.dir: file:///flink-data/checkpoints
+    state.savepoints.dir: file:///flink-data/savepoints
+  serviceAccount: flink
+  jobManager:
+    resource:
+      memory: "2048m"
+      cpu: 1
+  taskManager:
+    resource:
+      memory: "2048m"
+      cpu: 1
+  podTemplate:
+    spec:
+      containers:
+        - name: flink-main-container
+          volumeMounts:
+            - mountPath: /flink-data
+              name: flink-volume
+      volumes:
+        - name: flink-volume
+          hostPath:
+            path: /tmp/flink
+            type: Directory
+  job:
+    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
+    parallelism: 2
+    upgradeMode: savepoint
+    flinkStateSnapshotReference:
+      name: example-savepoint
+      namespace: flink
diff --git a/examples/snapshot/savepoint-already-exists.yaml 
b/examples/snapshot/savepoint-already-exists.yaml
new file mode 100644
index 00000000..ebacde13
--- /dev/null
+++ b/examples/snapshot/savepoint-already-exists.yaml
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+apiVersion: flink.apache.org/v1beta1
+kind: FlinkStateSnapshot
+metadata:
+  name: example-savepoint
+spec:
+  savepoint:
+    alreadyExists: true
+    disposeOnDelete: false
+    path: /flink-data/savepoints/savepoint-98682a-21931bf10c39
diff --git a/examples/snapshot/savepoint-job-defaults.yaml 
b/examples/snapshot/savepoint-job-defaults.yaml
new file mode 100644
index 00000000..9cd26e37
--- /dev/null
+++ b/examples/snapshot/savepoint-job-defaults.yaml
@@ -0,0 +1,28 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+apiVersion: flink.apache.org/v1beta1
+kind: FlinkStateSnapshot
+metadata:
+  name: example-savepoint
+spec:
+  backoffLimit: 0
+  jobReference:
+    kind: FlinkDeployment
+    name: example-deployment
+  savepoint: {}  # This will use the savepoint path configured in the job 
config.
diff --git a/examples/snapshot/savepoint-job-full-spec.yaml 
b/examples/snapshot/savepoint-job-full-spec.yaml
new file mode 100644
index 00000000..5a0356a4
--- /dev/null
+++ b/examples/snapshot/savepoint-job-full-spec.yaml
@@ -0,0 +1,32 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+apiVersion: flink.apache.org/v1beta1
+kind: FlinkStateSnapshot
+metadata:
+  name: example-savepoint
+spec:
+  backoffLimit: 0
+  jobReference:
+    kind: FlinkDeployment
+    name: example-deployment
+  savepoint:
+    alreadyExists: false
+    disposeOnDelete: true
+    formatType: CANONICAL
+    path: /flink-data/savepoints-custom

Reply via email to