This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new cdd81c79 [FLINK-35266][snapshot] Add E2E tests for FlinkStateSnapshot
cdd81c79 is described below

commit cdd81c7973a9920916f6a32e1ea9cadbeab4330b
Author: Mate Czagany <[email protected]>
AuthorDate: Wed Aug 28 11:56:28 2024 +0200

    [FLINK-35266][snapshot] Add E2E tests for FlinkStateSnapshot
---
 .github/workflows/ci.yml            |  11 +++
 e2e-tests/data/savepoint.yaml       |  32 +++++++
 e2e-tests/test_autoscaler.sh        |   0
 e2e-tests/test_dynamic_config.sh    |   0
 e2e-tests/test_flink_operator_ha.sh |   0
 e2e-tests/test_snapshot.sh          | 178 ++++++++++++++++++++++++++++++++++++
 e2e-tests/utils.sh                  |  31 +++++++
 7 files changed, 252 insertions(+)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index a14e9524..84292c97 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -87,6 +87,7 @@ jobs:
           - test_multi_sessionjob.sh
           - test_autoscaler.sh
           - test_flink_operator_ha.sh
+          - test_snapshot.sh
         include:
           - namespace: flink
             extraArgs: '--create-namespace --set 
"watchNamespaces={default,flink}"'
@@ -111,20 +112,30 @@ jobs:
             test: test_autoscaler.sh
           - mode: standalone
             test: test_dynamic_config.sh
+          - mode: standalone
+            test: test_snapshot.sh
           - version: v1_16
             test: test_autoscaler.sh
           - version: v1_16
             test: test_dynamic_config.sh
           - version: v1_16
             test: test_flink_operator_ha.sh
+          - version: v1_16
+            test: test_snapshot.sh
           - version: v1_17
             test: test_dynamic_config.sh
           - version: v1_17
             test: test_flink_operator_ha.sh
+          - version: v1_17
+            test: test_snapshot.sh
           - version: v1_18
             test: test_dynamic_config.sh
           - version: v1_18
             test: test_flink_operator_ha.sh
+          - version: v1_18
+            test: test_snapshot.sh
+          - version: v1_19
+            test: test_snapshot.sh
           - version: v1_16
             java-version: 17
           - version: v1_17
diff --git a/e2e-tests/data/savepoint.yaml b/e2e-tests/data/savepoint.yaml
new file mode 100644
index 00000000..e32e9291
--- /dev/null
+++ b/e2e-tests/data/savepoint.yaml
@@ -0,0 +1,32 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+apiVersion: flink.apache.org/v1beta1
+kind: FlinkStateSnapshot
+metadata:
+  name: example-savepoint
+spec:
+  backoffLimit: 0
+  jobReference:
+    kind: FlinkDeployment
+    name: flink-example-statemachine
+  savepoint:
+    alreadyExists: false
+    disposeOnDelete: true
+    formatType: CANONICAL
+
diff --git a/e2e-tests/test_autoscaler.sh b/e2e-tests/test_autoscaler.sh
old mode 100644
new mode 100755
diff --git a/e2e-tests/test_dynamic_config.sh b/e2e-tests/test_dynamic_config.sh
old mode 100644
new mode 100755
diff --git a/e2e-tests/test_flink_operator_ha.sh 
b/e2e-tests/test_flink_operator_ha.sh
old mode 100644
new mode 100755
diff --git a/e2e-tests/test_snapshot.sh b/e2e-tests/test_snapshot.sh
new file mode 100755
index 00000000..8d080465
--- /dev/null
+++ b/e2e-tests/test_snapshot.sh
@@ -0,0 +1,178 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# This script tests the FlinkStateSnapshot CR as follows:
+# 1. Create deployment with FlinkStateSnapshot disabled and upgrade it. Then 
enable FlinkStateSnapshot and assert that the saved savepoint was used.
+# 2. Trigger and dispose of savepoint by creating a new FlinkStateSnapshot 
savepoint CR
+# 3. Trigger savepoint by using savepoint trigger nonce
+# 4. Trigger checkpoint by using trigger nonce
+# 5. Test periodic savepoints triggered by the operator
+# 6. Change job to upgrade mode, suspend job and assert new FlinkStateSnapshot 
CR created
+
+SCRIPT_DIR=$(dirname "$(readlink -f "$0")")
+source "${SCRIPT_DIR}/utils.sh"
+
+CLUSTER_ID="flink-example-statemachine"
+APPLICATION_YAML="${SCRIPT_DIR}/data/flinkdep-cr.yaml"
+APPLICATION_IDENTIFIER="flinkdep/$CLUSTER_ID"
+
+SAVEPOINT_YAML="${SCRIPT_DIR}/data/savepoint.yaml"
+SAVEPOINT_IDENTIFIER="flinksnp/example-savepoint"
+
+TIMEOUT=300
+
+on_exit cleanup_and_exit "$APPLICATION_YAML" $TIMEOUT $CLUSTER_ID
+on_exit cleanup_snapshots "$CLUSTER_ID" $TIMEOUT
+
+retry_times 5 30 "kubectl apply -f $APPLICATION_YAML" || exit 1
+kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch 
'{"spec":{"job":{"upgradeMode": 
"savepoint"},"flinkConfiguration":{"web.checkpoints.history":"1000"}}}'
+
+wait_for_jobmanager_running $CLUSTER_ID $TIMEOUT
+jm_pod_name=$(get_jm_pod_name $CLUSTER_ID)
+
+wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || 
exit 1
+wait_for_status $APPLICATION_IDENTIFIER '.status.jobManagerDeploymentStatus' 
READY ${TIMEOUT} || exit 1
+wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' RUNNING 
${TIMEOUT} || exit 1
+
+
+
+# Test upgrade by setting legacy field
+kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch 
'{"spec":{"job":{"state": "suspended"}}}'
+wait_for_status $APPLICATION_IDENTIFIER '.status.lifecycleState' "SUSPENDED" 
${TIMEOUT} || exit 1
+
+location=$(kubectl get $APPLICATION_IDENTIFIER -o yaml | yq 
'.status.jobStatus.upgradeSnapshotReference.path')
+if [ "$location" == "" ]; then echo "Legacy savepoint location was empty"; 
exit 1; fi
+echo "Removing upgradeSnapshotReference and setting lastSavepoint"
+kubectl patch flinkdep ${CLUSTER_ID} --type=merge --subresource status --patch 
'{"status":{"jobStatus":{"upgradeSnapshotReference":null,"savepointInfo":{"lastSavepoint":{"timeStamp":
 0, "location": "'$location'", "triggerNonce": 0}}}}}'
+
+# Delete operator Pod to clear CR state cache
+kubectl delete pod -n $(get_operator_pod_namespace) $(get_operator_pod_name)
+sleep 20
+retry_times 10 10 "kubectl wait -n $(get_operator_pod_namespace) 
--for=condition=Ready --timeout=${TIMEOUT}s pod/$(get_operator_pod_name)" || 
exit 1
+
+echo "Restarting deployment and asserting savepoint path used"
+kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"job": 
{"state": "running" } } }'
+wait_for_jobmanager_running $CLUSTER_ID $TIMEOUT
+wait_for_status $APPLICATION_IDENTIFIER '.status.jobManagerDeploymentStatus' 
READY ${TIMEOUT} || exit 1
+wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' RUNNING 
${TIMEOUT} || exit 1
+
+jm_pod_name=$(get_jm_pod_name $CLUSTER_ID)
+wait_for_logs $jm_pod_name "Restoring job .* from Savepoint" ${TIMEOUT} || 
exit 1
+wait_for_logs $jm_pod_name "execution.savepoint.path, ${location}" ${TIMEOUT} 
|| exit 1
+wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || 
exit 1
+
+
+
+# Enable FlinkStateSnapshot CRs
+kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch 
'{"spec":{"flinkConfiguration":{"kubernetes.operator.snapshot.resource.enabled":"true"}}}'
+job_id=$(kubectl logs $jm_pod_name -c flink-main-container | grep -E -o 'Job 
[a-z0-9]+ is submitted' | awk '{print $2}')
+echo "Found job ID $job_id"
+
+
+
+# Testing manual savepoint trigger and disposal via CR
+echo "Creating manual savepoint..."
+retry_times 5 30 "kubectl apply -f $SAVEPOINT_YAML" || exit 1
+wait_for_status $SAVEPOINT_IDENTIFIER '.status.state' "COMPLETED" $TIMEOUT || 
exit 1
+
+location=$(kubectl get $SAVEPOINT_IDENTIFIER -o yaml | yq '.status.path')
+if [ "$location" == "" ]; then echo "Manual savepoint location was empty"; 
exit 1; fi
+
+echo "Disposing manual savepoint..."
+kubectl delete $SAVEPOINT_IDENTIFIER
+wait_for_logs $jm_pod_name "Disposing savepoint $location" ${TIMEOUT} || exit 1
+
+
+
+# Testing manual savepoint via trigger nonce
+kubectl patch $APPLICATION_IDENTIFIER --type merge --patch '{"spec":{"job": 
{"savepointTriggerNonce": 123456 } } }'
+
+echo "Waiting for manual savepoint..."
+snapshot=$(wait_for_snapshot $CLUSTER_ID "savepoint" "manual" ${TIMEOUT})
+if [ "$snapshot" == "" ]; then echo "Could not find snapshot"; exit 1; fi
+echo "Found snapshot with name $snapshot"
+
+wait_for_status flinksnp/$snapshot '.status.spec.checkpoint' null $TIMEOUT || 
exit 1
+wait_for_status $APPLICATION_IDENTIFIER 
'.status.jobStatus.savepointInfo.triggerId' null $TIMEOUT || exit 1
+wait_for_status $APPLICATION_IDENTIFIER 
'.status.jobStatus.savepointInfo.triggerTimestamp' null $TIMEOUT || exit 1
+if [ "$(kubectl get flinksnp/$snapshot -o yaml | yq '.status.path')" == "" ]; 
then echo "Manual savepoint location was empty"; exit 1; fi
+kubectl delete flinksnp/$snapshot
+
+
+
+# Testing manual checkpoint via trigger nonce
+kubectl patch $APPLICATION_IDENTIFIER --type merge --patch '{"spec":{"job": 
{"checkpointTriggerNonce": 123456 } } }'
+
+echo "Waiting for manual checkpoint..."
+snapshot=$(wait_for_snapshot $CLUSTER_ID "checkpoint" "manual" ${TIMEOUT})
+if [ "$snapshot" == "" ]; then echo "Could not find snapshot"; exit 1; fi
+
+echo "Found checkpoint with name $snapshot"
+
+wait_for_status flinksnp/$snapshot '.status.spec.savepoint' null $TIMEOUT || 
exit 1
+if [ "$(kubectl get flinksnp/$snapshot -o yaml | yq '.status.path')" == "" ]; 
then echo "Manual checkpoint location was empty"; exit 1; fi
+kubectl delete flinksnp/$snapshot
+
+
+# Test periodic savepoints
+kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch 
'{"spec":{"flinkConfiguration":{"kubernetes.operator.periodic.savepoint.interval":"60s"}}}'
+sleep 20
+
+echo "Waiting for periodic savepoint..."
+snapshot=$(wait_for_snapshot $CLUSTER_ID "savepoint" "periodic" ${TIMEOUT})
+if [ "$snapshot" == "" ]; then echo "Could not find snapshot"; exit 1; fi
+
+echo "Found periodic savepoint: $snapshot"
+if [ "$(kubectl get flinksnp/$snapshot -o yaml | yq '.status.path')" == "" ]; 
then echo "Periodic savepoint location was empty"; exit 1; fi
+kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch 
'{"spec":{"flinkConfiguration":{"kubernetes.operator.periodic.savepoint.interval":""}}}'
+
+
+# Test upgrade savepoint
+echo "Suspending deployment..."
+kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch 
'{"spec":{"job":{"state":"suspended"}}}'
+wait_for_status $APPLICATION_IDENTIFIER '.status.lifecycleState' "SUSPENDED" 
${TIMEOUT} || exit 1
+
+echo "Waiting for upgrade savepoint..."
+snapshot=$(wait_for_snapshot $CLUSTER_ID "savepoint" "upgrade" ${TIMEOUT})
+if [ "$snapshot" == "" ]; then echo "Could not find snapshot"; exit 1; fi
+echo "Found upgrade snapshot: $snapshot"
+wait_for_status $APPLICATION_IDENTIFIER 
'.status.jobStatus.upgradeSnapshotReference.name' "$snapshot" ${TIMEOUT} || 
exit 1
+
+location=$(kubectl get flinksnp/$snapshot -o yaml | yq '.status.path')
+if [ "$location" == "" ]; then echo "Upgrade savepoint location was empty"; 
exit 1; fi
+
+
+
+echo "Restarting deployment..."
+kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"job": 
{"state": "running" } } }'
+wait_for_jobmanager_running $CLUSTER_ID $TIMEOUT
+wait_for_status $APPLICATION_IDENTIFIER '.status.jobManagerDeploymentStatus' 
READY ${TIMEOUT} || exit 1
+wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' RUNNING 
${TIMEOUT} || exit 1
+
+jm_pod_name=$(get_jm_pod_name $CLUSTER_ID)
+
+# Check the new JobManager recovering from latest successful checkpoint
+wait_for_logs $jm_pod_name "Restoring job .* from Savepoint" ${TIMEOUT} || 
exit 1
+wait_for_logs $jm_pod_name "execution.savepoint.path, ${location}" ${TIMEOUT} 
|| exit 1
+wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || 
exit 1
+
+kubectl delete flinksnp/$snapshot
+
+echo "Successfully run the FlinkStateSnapshot test"
+
diff --git a/e2e-tests/utils.sh b/e2e-tests/utils.sh
index fe48acca..5356a40f 100755
--- a/e2e-tests/utils.sh
+++ b/e2e-tests/utils.sh
@@ -97,6 +97,25 @@ function wait_for_event {
   exit 1
 }
 
+function wait_for_snapshot {
+  local job_name=$1
+  local snapshot_type=$2
+  local trigger_type=$3
+  local timeout=$4
+  local prefix="$job_name-$snapshot_type-$trigger_type"
+
+  for i in $(seq 1 ${timeout}); do
+    snapshot_name=$(kubectl get flinksnp --sort-by=.metadata.creationTimestamp 
| grep $prefix | awk '{print $1}' | tail -n 1)
+    if [ "$snapshot_name" ]; then
+      kubectl wait --timeout=${timeout}s 
--for=jsonpath='{.status.state}'=COMPLETED flinksnp/$snapshot_name > /dev/null 
|| return 1
+      echo "$snapshot_name"
+      return 0
+    fi
+    sleep 1
+  done
+  return 1
+}
+
 function assert_available_slots() {
   expected=$1
   CLUSTER_ID=$2
@@ -331,6 +350,18 @@ function operator_cleanup_and_exit() {
   fi
 }
 
+function cleanup_snapshots() {
+    echo "Starting cleanup of FlinkStateSnapshot resources"
+
+    CLUSTER_ID=$1
+    TIMEOUT=$2
+
+    kubectl get flinksnp | grep "^${CLUSTER_ID}" | awk '{print $1}' | xargs -n 
1 -P 5 kubectl patch flinksnp -p '{"metadata":{"finalizers":null}}' --type=merge
+    kubectl get flinksnp | grep "^${CLUSTER_ID}" | awk '{print $1}' | xargs -n 
1 -P 5 kubectl delete --timeout=${TIMEOUT}s flinksnp
+
+    echo "Finished cleaning up FlinkStateSnapshot resources"
+}
+
 function _on_exit_callback {
   # Export the exit code so that it could be used by the callback commands
   export TRAPPED_EXIT_CODE=$?

Reply via email to