[FLINK-9008] [e2e] Implements quickstarts end to end test
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/393eb679 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/393eb679 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/393eb679 Branch: refs/heads/master Commit: 393eb679d39658c2592cf736d7b50921ed6c5319 Parents: 71095dc Author: zhangminglei <[email protected]> Authored: Thu May 17 21:05:10 2018 +0800 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Tue May 22 16:00:17 2018 +0800 ---------------------------------------------------------------------- .../flink-quickstart-test/pom.xml | 49 +++++++ .../quickstart/ElasticsearchStreamingJob.java | 89 +++++++++++++ flink-end-to-end-tests/pom.xml | 1 + flink-end-to-end-tests/run-nightly-tests.sh | 8 ++ .../test-scripts/test_quickstarts.sh | 127 +++++++++++++++++++ 5 files changed, 274 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/393eb679/flink-end-to-end-tests/flink-quickstart-test/pom.xml ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-quickstart-test/pom.xml b/flink-end-to-end-tests/flink-quickstart-test/pom.xml new file mode 100644 index 0000000..59f5416 --- /dev/null +++ b/flink-end-to-end-tests/flink-quickstart-test/pom.xml @@ -0,0 +1,49 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>flink-end-to-end-tests</artifactId> + <groupId>org.apache.flink</groupId> + <version>1.6-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <modelVersion>4.0.0</modelVersion> + + <artifactId>flink-quickstart-test</artifactId> + <name>flink-quickstart-test</name> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-elasticsearch2_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/393eb679/flink-end-to-end-tests/flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java b/flink-end-to-end-tests/flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java new file mode 100644 index 0000000..e20e045 --- /dev/null +++ b/flink-end-to-end-tests/flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.quickstart; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; +import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink; + +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Requests; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Elasticsearch example for Flink Streaming Job. + * + * <p>In this streaming job, we generate a bunch of data from numbers, apply operator map + * made a type conversion. Then we choose elasticsearch as its sink to storage these data. + * + * <p>Run test_quickstarts.sh to verify this program. Package this class to a jar, verify the jar, + * then deploy it on a flink cluster. + */ +public class ElasticsearchStreamingJob { + + public static void main(String[] args) throws Exception { + // set up the streaming execution environment + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<String> source = env.generateSequence(0, 20) + .map(new MapFunction<Long, String>() { + @Override + public String map(Long value) throws Exception { + return value.toString(); + }}); + + Map<String, String> userConfig = new HashMap<>(); + userConfig.put("cluster.name", "elasticsearch"); + // This instructs the sink to emit after every element, otherwise they would be buffered + userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); + + List<InetSocketAddress> transports = new ArrayList<>(); + transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); + + source.addSink(new ElasticsearchSink<>(userConfig, transports, new ElasticsearchSinkFunction<String>(){ + @Override + public void process(String element, RuntimeContext ctx, org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer indexer) { + indexer.add(createIndexRequest(element)); + } + })); + + // execute program + env.execute("Flink Streaming Job of writing data to elasticsearch"); + } + + private static IndexRequest createIndexRequest(String element) { + Map<String, Object> json = new HashMap<>(); + json.put("data", element); + + return Requests.indexRequest() + .index("my-index") + .type("my-type") + .id(element) + .source(json); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/393eb679/flink-end-to-end-tests/pom.xml ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml index 581abc8..8fb7eb8 100644 --- a/flink-end-to-end-tests/pom.xml +++ b/flink-end-to-end-tests/pom.xml @@ -47,6 +47,7 @@ under the License. <module>flink-elasticsearch1-test</module> <module>flink-elasticsearch2-test</module> <module>flink-elasticsearch5-test</module> + <module>flink-quickstart-test</module> </modules> <build> http://git-wip-us.apache.org/repos/asf/flink/blob/393eb679/flink-end-to-end-tests/run-nightly-tests.sh ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh index 0ec3492..2a15e6d 100755 --- a/flink-end-to-end-tests/run-nightly-tests.sh +++ b/flink-end-to-end-tests/run-nightly-tests.sh @@ -191,5 +191,13 @@ if [ $EXIT_CODE == 0 ]; then EXIT_CODE=$? fi +if [ $EXIT_CODE == 0 ]; then + printf "\n==============================================================================\n" + printf "Running Quickstarts nightly end-to-end test\n" + printf "==============================================================================\n" + $END_TO_END_DIR/test-scripts/test_quickstarts.sh + EXIT_CODE=$? +fi + # Exit code for Travis build success/failure exit $EXIT_CODE http://git-wip-us.apache.org/repos/asf/flink/blob/393eb679/flink-end-to-end-tests/test-scripts/test_quickstarts.sh ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/test-scripts/test_quickstarts.sh b/flink-end-to-end-tests/test-scripts/test_quickstarts.sh new file mode 100755 index 0000000..8f7aaab --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test_quickstarts.sh @@ -0,0 +1,127 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# End to end test for quick starts test. + +source "$(dirname "$0")"/common.sh + +mkdir -p $TEST_DATA_DIR + +cd $TEST_DATA_DIR + +mvn archetype:generate \ + -DarchetypeGroupId=org.apache.flink \ + -DarchetypeArtifactId=flink-quickstart-java \ + -DarchetypeVersion=1.6-SNAPSHOT \ + -DgroupId=org.apache.flink.quickstart \ + -DartifactId=flink-java-project \ + -Dversion=0.1 \ + -Dpackage=org.apache.flink.quickstart \ + -DinteractiveMode=false + +cd flink-java-project + +cp $TEST_DATA_DIR/../../flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java $TEST_DATA_DIR/flink-java-project/src/main/java/org/apache/flink/quickstart/ + +position=$(awk '/<dependencies>/ {print NR}' pom.xml | head -1) + +sed -i -e ''"$(($position + 1))"'i\ +<dependency>\ +<groupId>org.apache.flink</groupId>\ +<artifactId>flink-connector-elasticsearch2_${scala.binary.version}</artifactId>\ +<version>${flink.version}</version>\ +</dependency>' pom.xml + +sed -i -e "s/org.apache.flink.quickstart.StreamingJob/org.apache.flink.quickstart.ElasticsearchStreamingJob/" pom.xml + +mvn clean package -nsu + +cd target +jar tvf flink-java-project-0.1.jar > contentsInJar.txt + +if [[ `grep -c "org/apache/flink/api/java" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/api" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/experimental" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/runtime" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/util" contentsInJar.txt` -eq '0' ]]; then + + echo "Success: There are no flink core classes are contained in the jar." +else + echo "Failure: There are flink core classes are contained in the jar." + PASS="" + exit 1 +fi + +if [[ `grep -c "org/apache/flink/quickstart/StreamingJob.class" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/quickstart/ElasticsearchStreamingJob.class" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/connectors/elasticsearch2" contentsInJar.txt` -eq '0' ]]; then + + echo "Failure: Since ElasticsearchStreamingJob.class and other user classes are not included in the jar. " + PASS="" + exit 1 +else + echo "Success: ElasticsearchStreamingJob.class and other user classes are included in the jar." +fi + +ELASTICSEARCH_URL="https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz" + +curl "$ELASTICSEARCH_URL" > $TEST_DATA_DIR/elasticsearch.tar.gz +tar xzf $TEST_DATA_DIR/elasticsearch.tar.gz -C $TEST_DATA_DIR/ +ELASTICSEARCH_DIR=$TEST_DATA_DIR/elasticsearch-2.3.5 + +nohup $ELASTICSEARCH_DIR/bin/elasticsearch & + +ELASTICSEARCH_PROCESS=$(jps | grep Elasticsearch | awk '{print $2}') + +# make sure the elasticsearch node is actually running +if [ "$ELASTICSEARCH_PROCESS" != "Elasticsearch" ]; then + echo "Elasticsearch node is not running." + PASS="" + exit 1 +else + echo "Elasticsearch node is running." +fi + +TEST_PROGRAM_JAR=$TEST_DATA_DIR/flink-java-project/target/flink-java-project-0.1.jar + +start_cluster + +# run the Flink job +$FLINK_DIR/bin/flink run -c org.apache.flink.quickstart.ElasticsearchStreamingJob $TEST_PROGRAM_JAR + +curl 'localhost:9200/my-index/_search?q=*&pretty&size=21' > $TEST_DATA_DIR/output + +if [ -n "$(grep '\"total\" : 21' $TEST_DATA_DIR/output)" ]; then + echo "Quickstarts end to end test pass." +else + echo "Quickstarts end to end test failed." + PASS="" + exit 1 +fi + +function shutdownAndCleanup { + pid=$(jps | grep Elasticsearch | awk '{print $1}') + kill -SIGTERM $pid + + # make sure to run regular cleanup as well + cleanup +} + +trap shutdownAndCleanup INT +trap shutdownAndCleanup EXIT
