Repository: flink Updated Branches: refs/heads/master 656ac831c -> bb118104b
[FLINK-6539] Add automated batch WordCount end-to-end test Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4f1c764e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4f1c764e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4f1c764e Branch: refs/heads/master Commit: 4f1c764e969d8c87db83cb1f8687bb52ec211dbc Parents: 656ac83 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Tue May 9 13:39:27 2017 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Mon Jul 24 11:17:44 2017 +0200 ---------------------------------------------------------------------- pom.xml | 1 + test-infra/end-to-end-test/common.sh | 118 +++++++++++++++++++ test-infra/end-to-end-test/test-data/words | 1 + .../end-to-end-test/test_batch_wordcount.sh | 41 +++++++ 4 files changed, 161 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4f1c764e/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 2f14765..8c344b6 100644 --- a/pom.xml +++ b/pom.xml @@ -974,6 +974,7 @@ under the License. <exclude>flink-connectors/flink-avro/src/test/resources/avro/*.avsc</exclude> <exclude>out/test/flink-avro/avro/user.avsc</exclude> <exclude>flink-libraries/flink-table/src/test/scala/resources/*.out</exclude> + <exclude>test-infra/end-to-end-test/test-data/*</exclude> <!-- snapshots --> <exclude>**/src/test/resources/*-snapshot</exclude> http://git-wip-us.apache.org/repos/asf/flink/blob/4f1c764e/test-infra/end-to-end-test/common.sh ---------------------------------------------------------------------- diff --git a/test-infra/end-to-end-test/common.sh b/test-infra/end-to-end-test/common.sh new file mode 100644 index 0000000..00e4e75 --- /dev/null +++ b/test-infra/end-to-end-test/common.sh @@ -0,0 +1,118 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +set -e +set -o pipefail + +export FLINK_DIR="$1" +export CLUSTER_MODE="$2" + +export PASS=1 + +echo "Flink dist directory: $FLINK_DIR" + +# used to randomize created directories +export TEST_DATA_DIR=$TEST_INFRA_DIR/temp-test-directory-$(date +%S%N) +echo "TEST_DATA_DIR: $TEST_DATA_DIR" + +function start_cluster { + if [[ "$CLUSTER_MODE" == "local" ]]; then + $FLINK_DIR/bin/start-local.sh + elif [[ "$CLUSTER_MODE" == "cluster" ]]; then + $FLINK_DIR/bin/start-cluster.sh + else + echo "Unrecognized cluster mode: $CLUSTER_MODE" + exit + fi + + # wait for TaskManager to come up + # wait roughly 10 seconds + for i in {1..10}; do + # without the || true this would exit our script if the JobManager is not yet up + QUERY_RESULT=$(curl "http://localhost:8081/taskmanagers" || true) + + if [[ "$QUERY_RESULT" == "" ]]; then + echo "JobManager is not yet up" + elif [[ "$QUERY_RESULT" != "{\"taskmanagers\":[]}" ]]; then + break + fi + + echo "Waiting for cluster to come up..." + sleep 1 + done +} + +function stop_cluster { + if [[ "$CLUSTER_MODE" == "local" ]]; then + $FLINK_DIR/bin/stop-local.sh + elif [[ "$CLUSTER_MODE" == "cluster" ]]; then + $FLINK_DIR/bin/stop-cluster.sh + fi + + if grep -riq "error" $FLINK_DIR/log; then + echo "Found error in log files." + PASS="" + fi + if grep -riq "exception" $FLINK_DIR/log; then + echo "Found exception in log files." + PASS="" + fi + + for f in `ls $FLINK_DIR/log/*.out` + do + if [[ -s $f ]]; then + echo "Found non-empty file $f" + PASS="" + fi + done + + rm $FLINK_DIR/log/* +} + +function check_result_hash { + local name=$1 + local outfile_prefix=$2 + local expected=$3 + + local actual=$(LC_ALL=C sort $outfile_prefix* | md5sum | awk '{print $1}' \ + || LC_ALL=C sort $outfile_prefix* | md5 -q) || exit 2 # OSX + if [[ "$actual" != "$expected" ]] + then + echo "FAIL $name: Output hash mismatch. Got $actual, expected $expected." + PASS="" + echo "head hexdump of actual:" + head $outfile_prefix* | hexdump -c + else + echo "pass $name" + # Output files are left behind in /tmp + fi +} + +function check_all_pass { + if [[ ! "$PASS" ]] + then + echo "One or more tests FAILED." + exit 1 + fi + echo "All tests PASS" +} + +function clean_data_dir { + rm -r $TEST_DATA_DIR +} http://git-wip-us.apache.org/repos/asf/flink/blob/4f1c764e/test-infra/end-to-end-test/test-data/words ---------------------------------------------------------------------- diff --git a/test-infra/end-to-end-test/test-data/words b/test-infra/end-to-end-test/test-data/words new file mode 100644 index 0000000..0213510 --- /dev/null +++ b/test-infra/end-to-end-test/test-data/words @@ -0,0 +1 @@ +Hello World how are you, my dear dear world http://git-wip-us.apache.org/repos/asf/flink/blob/4f1c764e/test-infra/end-to-end-test/test_batch_wordcount.sh ---------------------------------------------------------------------- diff --git a/test-infra/end-to-end-test/test_batch_wordcount.sh b/test-infra/end-to-end-test/test_batch_wordcount.sh new file mode 100755 index 0000000..dfde5c6 --- /dev/null +++ b/test-infra/end-to-end-test/test_batch_wordcount.sh @@ -0,0 +1,41 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + + +set -e +set -o pipefail + +# Convert relative path to absolute path +TEST_ROOT=`pwd` +TEST_INFRA_DIR="$0" +TEST_INFRA_DIR=`dirname "$TEST_INFRA_DIR"` +cd $TEST_INFRA_DIR +TEST_INFRA_DIR=`pwd` +cd $TEST_ROOT + +. "$TEST_INFRA_DIR"/common.sh + +start_cluster + +$FLINK_DIR/bin/flink run -p 1 $FLINK_DIR/examples/batch/WordCount.jar --input $TEST_INFRA_DIR/test-data/words --output $TEST_DATA_DIR/out/wc_out +check_result_hash "WordCount" $TEST_DATA_DIR/out/wc_out "72a690412be8928ba239c2da967328a5" + +stop_cluster +clean_data_dir +check_all_pass \ No newline at end of file