[FLINK-9320] [e2e] Update test_ha e2e to use general purpose DataStream job
This closes #5990. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d7ec5a93 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d7ec5a93 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d7ec5a93 Branch: refs/heads/release-1.5 Commit: d7ec5a9394713c5de1a2846ce96bd013ed23c53a Parents: aec4496 Author: Tzu-Li (Gordon) Tai <[email protected]> Authored: Fri May 11 15:09:00 2018 +0800 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Tue May 22 16:49:17 2018 +0800 ---------------------------------------------------------------------- flink-end-to-end-tests/run-nightly-tests.sh | 17 ++++++++- flink-end-to-end-tests/test-scripts/common.sh | 1 - flink-end-to-end-tests/test-scripts/test_ha.sh | 40 +++++++++++++-------- 3 files changed, 41 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d7ec5a93/flink-end-to-end-tests/run-nightly-tests.sh ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh index bd91bb2..4cfd778 100755 --- a/flink-end-to-end-tests/run-nightly-tests.sh +++ b/flink-end-to-end-tests/run-nightly-tests.sh @@ -48,7 +48,22 @@ EXIT_CODE=0 if [ $EXIT_CODE == 0 ]; then - run_test "HA end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh" + run_test "Running HA (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh file true false" + EXIT_CODE=$? +fi + +if [ $EXIT_CODE == 0 ]; then + run_test "Running HA (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh file false false" + EXIT_CODE=$? +fi + +if [ $EXIT_CODE == 0 ]; then + run_test "Running HA (rocks, non-incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh rocks true false" + EXIT_CODE=$? +fi + +if [ $EXIT_CODE == 0 ]; then + run_test "Running HA (rocks, incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh rocks true true" EXIT_CODE=$? fi http://git-wip-us.apache.org/repos/asf/flink/blob/d7ec5a93/flink-end-to-end-tests/test-scripts/common.sh ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh index 1db5dd2..2d0f13e 100644 --- a/flink-end-to-end-tests/test-scripts/common.sh +++ b/flink-end-to-end-tests/test-scripts/common.sh @@ -97,7 +97,6 @@ function create_ha_config() { jobmanager.heap.mb: 1024 taskmanager.heap.mb: 1024 taskmanager.numberOfTaskSlots: 4 - parallelism.default: 1 #============================================================================== # High Availability http://git-wip-us.apache.org/repos/asf/flink/blob/d7ec5a93/flink-end-to-end-tests/test-scripts/test_ha.sh ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/test-scripts/test_ha.sh b/flink-end-to-end-tests/test-scripts/test_ha.sh index 2e65504..6d94c03 100755 --- a/flink-end-to-end-tests/test-scripts/test_ha.sh +++ b/flink-end-to-end-tests/test-scripts/test_ha.sh @@ -19,7 +19,7 @@ source "$(dirname "$0")"/common.sh -TEST_PROGRAM_JAR=$FLINK_DIR/examples/streaming/StateMachineExample.jar\ --error-rate\ 0.0\ --sleep\ 2 +TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar JM_WATCHDOG_PID=0 TM_WATCHDOG_PID=0 @@ -48,12 +48,12 @@ function stop_cluster_and_watchdog() { } function verify_logs() { - local OUTPUT=$1 - local JM_FAILURES=$2 + local OUTPUT=$FLINK_DIR/log/*.out + local JM_FAILURES=$1 # verify that we have no alerts if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then - echo "FAILURE: Alerts found at the StateMachineExample with 0.0 error rate." + echo "FAILURE: Alerts found at the general purpose DataStream job." PASS="" fi @@ -151,7 +151,6 @@ function run_ha_test() { local BACKEND=$2 local ASYNC=$3 local INCREM=$4 - local OUTPUT=$5 local JM_KILLS=3 local CHECKPOINT_DIR="${TEST_DATA_DIR}/checkpoints/" @@ -166,11 +165,19 @@ function run_ha_test() { # submit a job in detached mode and let it run local JOB_ID=$($FLINK_DIR/bin/flink run -d -p ${PARALLELISM} \ $TEST_PROGRAM_JAR \ - --backend ${BACKEND} \ - --checkpoint-dir "file://${CHECKPOINT_DIR}" \ - --async-checkpoints ${ASYNC} \ - --incremental-checkpoints ${INCREM} \ - --output ${OUTPUT} | grep "Job has been submitted with JobID" | sed 's/.* //g') + --environment.parallelism ${PARALLELISM} \ + --test.semantics exactly-once \ + --test.simulate_failure true \ + --test.simulate_failure.num_records 200 \ + --test.simulate_failure.num_checkpoints 1 \ + --test.simulate_failure.max_failures 20 \ + --state_backend ${BACKEND} \ + --state_backend.checkpoint_directory "file://${CHECKPOINT_DIR}" \ + --state_backend.file.async ${ASYNC} \ + --state_backend.rocks.incremental ${INCREM} \ + --sequence_generator_source.sleep_time 15 \ + --sequence_generator_source.sleep_after_elements 1 \ + | grep "Job has been submitted with JobID" | sed 's/.* //g') wait_job_running ${JOB_ID} @@ -196,14 +203,17 @@ function run_ha_test() { sleep 60 done - verify_logs ${OUTPUT} ${JM_KILLS} + verify_logs ${JM_KILLS} # kill the cluster and zookeeper stop_cluster_and_watchdog } +trap stop_cluster_and_watchdog INT trap stop_cluster_and_watchdog EXIT -run_ha_test 4 "file" "false" "false" "${TEST_DATA_DIR}/output.txt" -run_ha_test 4 "rocks" "false" "false" "${TEST_DATA_DIR}/output.txt" -run_ha_test 4 "file" "true" "false" "${TEST_DATA_DIR}/output.txt" -run_ha_test 4 "rocks" "false" "true" "${TEST_DATA_DIR}/output.txt" + +STATE_BACKEND_TYPE=${1:-file} +STATE_BACKEND_FILE_ASYNC=${2:-true} +STATE_BACKEND_ROCKS_INCREMENTAL=${3:-false} + +run_ha_test 4 ${STATE_BACKEND_TYPE} ${STATE_BACKEND_FILE_ASYNC} ${STATE_BACKEND_ROCKS_INCREMENTAL}
