[FLINK-9074] [e2e] Add e2e test for resuming jobs from retained checkpoints
This closes #5969. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4d640ba9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4d640ba9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4d640ba9 Branch: refs/heads/master Commit: 4d640ba90402f70300e1a9e8486376f7a174c68a Parents: 0a65f1e Author: Tzu-Li (Gordon) Tai <[email protected]> Authored: Wed May 9 12:17:25 2018 +0800 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Tue May 15 16:19:08 2018 +0800 ---------------------------------------------------------------------- flink-end-to-end-tests/run-nightly-tests.sh | 24 +++++ flink-end-to-end-tests/test-scripts/common.sh | 21 +++++ .../test_resume_externalized_checkpoints.sh | 97 ++++++++++++++++++++ 3 files changed, 142 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4d640ba9/flink-end-to-end-tests/run-nightly-tests.sh ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh index 98d75f03..d00885d 100755 --- a/flink-end-to-end-tests/run-nightly-tests.sh +++ b/flink-end-to-end-tests/run-nightly-tests.sh @@ -130,6 +130,30 @@ fi if [ $EXIT_CODE == 0 ]; then printf "\n==============================================================================\n" + printf "Running Resuming Externalized Checkpoint (file, async) end-to-end test\n" + printf "==============================================================================\n" + STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=true $END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh + EXIT_CODE=$? +fi + +if [ $EXIT_CODE == 0 ]; then + printf "\n==============================================================================\n" + printf "Running Resuming Externalized Checkpoint (file, sync) end-to-end test\n" + printf "==============================================================================\n" + STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=false $END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh + EXIT_CODE=$? +fi + +if [ $EXIT_CODE == 0 ]; then + printf "\n==============================================================================\n" + printf "Running Resuming Externalized Checkpoint (rocks) end-to-end test\n" + printf "==============================================================================\n" + STATE_BACKEND_TYPE=rocks $END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh + EXIT_CODE=$? +fi + +if [ $EXIT_CODE == 0 ]; then + printf "\n==============================================================================\n" printf "Running DataSet allround nightly end-to-end test\n" printf "==============================================================================\n" $END_TO_END_DIR/test-scripts/test_batch_allround.sh http://git-wip-us.apache.org/repos/asf/flink/blob/4d640ba9/flink-end-to-end-tests/test-scripts/common.sh ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh index 3fd38ed..0371f81 100644 --- a/flink-end-to-end-tests/test-scripts/common.sh +++ b/flink-end-to-end-tests/test-scripts/common.sh @@ -380,6 +380,27 @@ function wait_oper_metric_num_in_records { done } +function wait_num_checkpoints { + JOB=$1 + NUM_CHECKPOINTS=$2 + + echo "Waiting for job ($JOB) to have at least $NUM_CHECKPOINTS completed checkpoints ..." + + while : ; do + N=$(grep -o "Completed checkpoint [1-9]* for job $JOB" $FLINK_DIR/log/*standalonesession*.log | awk '{print $3}' | tail -1) + + if [ -z $N ]; then + N=0 + fi + + if (( N < NUM_CHECKPOINTS )); then + sleep 1 + else + break + fi + done +} + # make sure to clean up even in case of failures function cleanup { stop_cluster http://git-wip-us.apache.org/repos/asf/flink/blob/4d640ba9/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh new file mode 100755 index 0000000..69a5851 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh @@ -0,0 +1,97 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +source "$(dirname "$0")"/common.sh + +STATE_BACKEND_TYPE=${STATE_BACKEND_TYPE:-file} +STATE_BACKEND_FILE_ASYNC=${STATE_BACKEND_FILE_ASYNC:-true} + +setup_flink_slf4j_metric_reporter +start_cluster + +function test_cleanup { + # don't call ourselves again for another signal interruption + trap "exit -1" INT + # don't call ourselves again for normal exit + trap "" EXIT + + rollback_flink_slf4j_metric_reporter + + # make sure to run regular cleanup as well + cleanup +} +trap test_cleanup INT +trap test_cleanup EXIT + +CHECKPOINT_DIR="$TEST_DATA_DIR/externalized-chckpt-e2e-backend-dir" +CHECKPOINT_DIR_URI="file://$CHECKPOINT_DIR" + +# run the DataStream allroundjob +TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar +DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d $TEST_PROGRAM_JAR \ + --test.semantics exactly-once \ + --environment.externalize_checkpoint true \ + --environment.externalize_checkpoint.cleanup retain \ + --state_backend $STATE_BACKEND_TYPE \ + --state_backend.checkpoint_directory $CHECKPOINT_DIR_URI \ + --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \ + --sequence_generator_source.sleep_time 15 \ + --sequence_generator_source.sleep_after_elements 1 \ + | grep "Job has been submitted with JobID" | sed 's/.* //g') + +wait_job_running $DATASTREAM_JOB + +wait_num_checkpoints $DATASTREAM_JOB 1 +wait_oper_metric_num_in_records ArtificalKeyedStateMapper.0 200 + +cancel_job $DATASTREAM_JOB + +CHECKPOINT_PATH=$(ls -d $CHECKPOINT_DIR/$DATASTREAM_JOB/chk-[1-9]*) + +if [ -z $CHECKPOINT_PATH ]; then + echo "Expected an externalized checkpoint to be present, but none exists." + PASS="" + exit 1 +fi + +NUM_CHECKPOINTS=$(echo $CHECKPOINT_PATH | wc -l | tr -d ' ') +if (( $NUM_CHECKPOINTS > 1 )); then + echo "Expected only exactly 1 externalized checkpoint to be present, but $NUM_CHECKPOINTS exists." + PASS="" + exit 1 +fi + +echo "Restoring job with externalized checkpoint at $CHECKPOINT_PATH ..." +DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $CHECKPOINT_PATH -d $TEST_PROGRAM_JAR \ + --test.semantics exactly-once \ + --environment.externalize_checkpoint true \ + --environment.externalize_checkpoint.cleanup retain \ + --state_backend $STATE_BACKEND_TYPE \ + --state_backend.checkpoint_directory $CHECKPOINT_DIR_URI \ + --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \ + --sequence_generator_source.sleep_time 15 \ + --sequence_generator_source.sleep_after_elements 1 \ + | grep "Job has been submitted with JobID" | sed 's/.* //g') + +wait_job_running $DATASTREAM_JOB + +wait_oper_metric_num_in_records ArtificalKeyedStateMapper.0 200 + +# if state is errorneous and the general purpose DataStream job produces alerting messages, +# output would be non-empty and the test will not pass
