[FLINK-9074] [e2e] Add e2e test for resuming jobs from retained checkpoints

This closes #5969.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3b22b481
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3b22b481
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3b22b481

Branch: refs/heads/release-1.5
Commit: 3b22b4810052f605cc2412399e5e2ef199efc8f1
Parents: 59f9c12
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Wed May 9 12:17:25 2018 +0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Tue May 15 16:47:54 2018 +0800

----------------------------------------------------------------------
 flink-end-to-end-tests/run-nightly-tests.sh     | 24 +++++
 flink-end-to-end-tests/test-scripts/common.sh   | 21 +++++
 .../test_resume_externalized_checkpoints.sh     | 97 ++++++++++++++++++++
 3 files changed, 142 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3b22b481/flink-end-to-end-tests/run-nightly-tests.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh 
b/flink-end-to-end-tests/run-nightly-tests.sh
index 98d75f03..d00885d 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -130,6 +130,30 @@ fi
 
 if [ $EXIT_CODE == 0 ]; then
   printf 
"\n==============================================================================\n"
+  printf "Running Resuming Externalized Checkpoint (file, async) end-to-end 
test\n"
+  printf 
"==============================================================================\n"
+  STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=true 
$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh
+  EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+  printf 
"\n==============================================================================\n"
+  printf "Running Resuming Externalized Checkpoint (file, sync) end-to-end 
test\n"
+  printf 
"==============================================================================\n"
+  STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=false 
$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh
+  EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+  printf 
"\n==============================================================================\n"
+  printf "Running Resuming Externalized Checkpoint (rocks) end-to-end test\n"
+  printf 
"==============================================================================\n"
+  STATE_BACKEND_TYPE=rocks 
$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh
+  EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+  printf 
"\n==============================================================================\n"
   printf "Running DataSet allround nightly end-to-end test\n"
   printf 
"==============================================================================\n"
   $END_TO_END_DIR/test-scripts/test_batch_allround.sh

http://git-wip-us.apache.org/repos/asf/flink/blob/3b22b481/flink-end-to-end-tests/test-scripts/common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/common.sh 
b/flink-end-to-end-tests/test-scripts/common.sh
index da460fa..b4e38a2 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -378,6 +378,27 @@ function wait_oper_metric_num_in_records {
     done
 }
 
+function wait_num_checkpoints {
+    JOB=$1
+    NUM_CHECKPOINTS=$2
+
+    echo "Waiting for job ($JOB) to have at least $NUM_CHECKPOINTS completed 
checkpoints ..."
+
+    while : ; do
+      N=$(grep -o "Completed checkpoint [1-9]* for job $JOB" 
$FLINK_DIR/log/*standalonesession*.log | awk '{print $3}' | tail -1)
+
+      if [ -z $N ]; then
+        N=0
+      fi
+
+      if (( N < NUM_CHECKPOINTS )); then
+        sleep 1
+      else
+        break
+      fi
+    done
+}
+
 # make sure to clean up even in case of failures
 function cleanup {
   stop_cluster

http://git-wip-us.apache.org/repos/asf/flink/blob/3b22b481/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh 
b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
new file mode 100755
index 0000000..69a5851
--- /dev/null
+++ 
b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
@@ -0,0 +1,97 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+
+STATE_BACKEND_TYPE=${STATE_BACKEND_TYPE:-file}
+STATE_BACKEND_FILE_ASYNC=${STATE_BACKEND_FILE_ASYNC:-true}
+
+setup_flink_slf4j_metric_reporter
+start_cluster
+
+function test_cleanup {
+  # don't call ourselves again for another signal interruption
+  trap "exit -1" INT
+  # don't call ourselves again for normal exit
+  trap "" EXIT
+
+  rollback_flink_slf4j_metric_reporter
+
+  # make sure to run regular cleanup as well
+  cleanup
+}
+trap test_cleanup INT
+trap test_cleanup EXIT
+
+CHECKPOINT_DIR="$TEST_DATA_DIR/externalized-chckpt-e2e-backend-dir"
+CHECKPOINT_DIR_URI="file://$CHECKPOINT_DIR"
+
+# run the DataStream allroundjob
+TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
+DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d $TEST_PROGRAM_JAR \
+  --test.semantics exactly-once \
+  --environment.externalize_checkpoint true \
+  --environment.externalize_checkpoint.cleanup retain \
+  --state_backend $STATE_BACKEND_TYPE \
+  --state_backend.checkpoint_directory $CHECKPOINT_DIR_URI \
+  --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
+  --sequence_generator_source.sleep_time 15 \
+  --sequence_generator_source.sleep_after_elements 1 \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $DATASTREAM_JOB
+
+wait_num_checkpoints $DATASTREAM_JOB 1
+wait_oper_metric_num_in_records ArtificalKeyedStateMapper.0 200
+
+cancel_job $DATASTREAM_JOB
+
+CHECKPOINT_PATH=$(ls -d $CHECKPOINT_DIR/$DATASTREAM_JOB/chk-[1-9]*)
+
+if [ -z $CHECKPOINT_PATH ]; then
+  echo "Expected an externalized checkpoint to be present, but none exists."
+  PASS=""
+  exit 1
+fi
+
+NUM_CHECKPOINTS=$(echo $CHECKPOINT_PATH | wc -l | tr -d ' ')
+if (( $NUM_CHECKPOINTS > 1 )); then
+  echo "Expected only exactly 1 externalized checkpoint to be present, but 
$NUM_CHECKPOINTS exists."
+  PASS=""
+  exit 1
+fi
+
+echo "Restoring job with externalized checkpoint at $CHECKPOINT_PATH ..."
+DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $CHECKPOINT_PATH -d 
$TEST_PROGRAM_JAR \
+  --test.semantics exactly-once \
+  --environment.externalize_checkpoint true \
+  --environment.externalize_checkpoint.cleanup retain \
+  --state_backend $STATE_BACKEND_TYPE \
+  --state_backend.checkpoint_directory $CHECKPOINT_DIR_URI \
+  --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
+  --sequence_generator_source.sleep_time 15 \
+  --sequence_generator_source.sleep_after_elements 1 \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $DATASTREAM_JOB
+
+wait_oper_metric_num_in_records ArtificalKeyedStateMapper.0 200
+
+# if state is errorneous and the general purpose DataStream job produces 
alerting messages,
+# output would be non-empty and the test will not pass

Reply via email to