[FLINK-8989] [e2e] Cleanup / improve Elasticsearch e2e tests - Rework e2e test job modules to have correct Maven POM - Parameterize num of records to write to Elasticsearch - Parameterize Elasticsearch download URL and version in test script - Improve robustness of test - Move more Elasticsearch functionality to elasticsearch-common.sh
This closes #5761. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3e434eeb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3e434eeb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3e434eeb Branch: refs/heads/release-1.5 Commit: 3e434eeb1dd66f568d859b999360ebc6769cade1 Parents: a7abfcb Author: Tzu-Li (Gordon) Tai <[email protected]> Authored: Tue May 22 15:10:32 2018 +0800 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Tue May 22 16:51:38 2018 +0800 ---------------------------------------------------------------------- .../flink-elasticsearch1-test/pom.xml | 45 ++------ .../tests/Elasticsearch1SinkExample.java | 18 +-- .../flink-elasticsearch2-test/pom.xml | 65 ++--------- .../tests/Elasticsearch2SinkExample.java | 17 +-- .../flink-elasticsearch5-test/pom.xml | 78 ++----------- .../tests/Elasticsearch5SinkExample.java | 18 +-- flink-end-to-end-tests/run-nightly-tests.sh | 23 +++- .../test-scripts/elasticsearch-common.sh | 48 +++++--- .../test_streaming_elasticsearch.sh | 51 +++++++++ .../test_streaming_elasticsearch125.sh | 109 ------------------- 10 files changed, 167 insertions(+), 305 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3e434eeb/flink-end-to-end-tests/flink-elasticsearch1-test/pom.xml ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-elasticsearch1-test/pom.xml b/flink-end-to-end-tests/flink-elasticsearch1-test/pom.xml index 1960f05..6ac8e71 100644 --- a/flink-end-to-end-tests/flink-elasticsearch1-test/pom.xml +++ b/flink-end-to-end-tests/flink-elasticsearch1-test/pom.xml @@ -21,16 +21,16 @@ under the License. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> - <artifactId>flink-end-to-end-tests</artifactId> <groupId>org.apache.flink</groupId> + <artifactId>flink-end-to-end-tests</artifactId> <version>1.5-SNAPSHOT</version> <relativePath>..</relativePath> </parent> - <modelVersion>4.0.0</modelVersion> - - <artifactId>flink-elasticsearch1-test_${scala.binary.version}</artifactId> + <artifactId>flink-elasticsearch1-test</artifactId> <name>flink-elasticsearch1-test</name> <packaging>jar</packaging> @@ -41,7 +41,6 @@ under the License. <version>${project.version}</version> <scope>provided</scope> </dependency> - <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch_${scala.binary.version}</artifactId> @@ -56,26 +55,18 @@ under the License. <artifactId>maven-shade-plugin</artifactId> <version>3.0.0</version> <executions> - <!-- Elasticsearch1Sink end to end example --> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> - <minimizeJar>true</minimizeJar> + <finalName>Elasticsearch1SinkExample</finalName> <artifactSet> <excludes> <exclude>com.google.code.findbugs:jsr305</exclude> - <exclude>org.slf4j:*</exclude> - <exclude>log4j:*</exclude> </excludes> </artifactSet> - <transformers> - <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> - <mainClass>org.apache.flink.streaming.tests.Elasticsearch1SinkExample</mainClass> - </transformer> - </transformers> <filters> <filter> <artifact>*:*</artifact> @@ -86,27 +77,11 @@ under the License. </excludes> </filter> </filters> - </configuration> - </execution> - </executions> - </plugin> - - <!--simplify the name of the testing JAR for referring to it in test_streaming_elasticsearch1.sh scripts--> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-antrun-plugin</artifactId> - <version>1.7</version> - <executions> - <execution> - <id>rename</id> - <phase>package</phase> - <goals> - <goal>run</goal> - </goals> - <configuration> - <target> - <copy file="${project.basedir}/target/flink-elasticsearch1-test_${scala.binary.version}-${project.version}.jar" tofile="${project.basedir}/target/Elasticsearch1SinkExample.jar" /> - </target> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>org.apache.flink.streaming.tests.Elasticsearch1SinkExample</mainClass> + </transformer> + </transformers> </configuration> </execution> </executions> http://git-wip-us.apache.org/repos/asf/flink/blob/3e434eeb/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java b/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java index bfdb806..18fa05a 100644 --- a/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java +++ b/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java @@ -41,13 +41,14 @@ import java.util.Map; * End to end test for Elasticsearch1Sink. */ public class Elasticsearch1SinkExample { + public static void main(String[] args) throws Exception { final ParameterTool parameterTool = ParameterTool.fromArgs(args); - if (parameterTool.getNumberOfParameters() < 2) { + if (parameterTool.getNumberOfParameters() < 3) { System.out.println("Missing parameters!\n" + - "Usage: --index <index> --type <type>"); + "Usage: --numRecords <numRecords> --index <index> --type <type>"); return; } @@ -55,12 +56,13 @@ public class Elasticsearch1SinkExample { env.getConfig().disableSysoutLogging(); env.enableCheckpointing(5000); - DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() { - @Override - public String map(Long value) throws Exception { - return "message # " + value; - } - }); + DataStream<String> source = env.generateSequence(0, parameterTool.getInt("numRecords") - 1) + .map(new MapFunction<Long, String>() { + @Override + public String map(Long value) throws Exception { + return "message # " + value; + } + }); Map<String, String> userConfig = new HashMap<>(); userConfig.put("cluster.name", "elasticsearch"); http://git-wip-us.apache.org/repos/asf/flink/blob/3e434eeb/flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml b/flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml index 4fd93de..d6a1abf 100644 --- a/flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml +++ b/flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml @@ -20,16 +20,17 @@ under the License. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + <parent> - <artifactId>flink-end-to-end-tests</artifactId> <groupId>org.apache.flink</groupId> + <artifactId>flink-end-to-end-tests</artifactId> <version>1.5-SNAPSHOT</version> <relativePath>..</relativePath> </parent> - <modelVersion>4.0.0</modelVersion> - - <artifactId>flink-elasticsearch2-test_${scala.binary.version}</artifactId> + <artifactId>flink-elasticsearch2-test</artifactId> <name>flink-elasticsearch2-test</name> <packaging>jar</packaging> @@ -40,31 +41,11 @@ under the License. <version>${project.version}</version> <scope>provided</scope> </dependency> - <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch2_${scala.binary.version}</artifactId> <version>${project.version}</version> </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <!-- Remove elasticsearch1.7.1 --> - <exclusions> - <exclusion> - <groupId>org.elasticsearch</groupId> - <artifactId>elasticsearch</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.elasticsearch</groupId> - <artifactId>elasticsearch</artifactId> - <version>2.3.5</version> - </dependency> </dependencies> <build> @@ -74,26 +55,18 @@ under the License. <artifactId>maven-shade-plugin</artifactId> <version>3.0.0</version> <executions> - <!-- Elasticsearch2Sink end to end example --> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> - <minimizeJar>true</minimizeJar> + <finalName>Elasticsearch2SinkExample</finalName> <artifactSet> <excludes> <exclude>com.google.code.findbugs:jsr305</exclude> - <exclude>org.slf4j:*</exclude> - <exclude>log4j:*</exclude> </excludes> </artifactSet> - <transformers> - <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> - <mainClass>org.apache.flink.streaming.tests.Elasticsearch2SinkExample</mainClass> - </transformer> - </transformers> <filters> <filter> <artifact>*:*</artifact> @@ -104,27 +77,11 @@ under the License. </excludes> </filter> </filters> - </configuration> - </execution> - </executions> - </plugin> - - <!--simplify the name of the testing JAR for referring to it in test_streaming_elasticsearch2.sh scripts--> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-antrun-plugin</artifactId> - <version>1.7</version> - <executions> - <execution> - <id>rename</id> - <phase>package</phase> - <goals> - <goal>run</goal> - </goals> - <configuration> - <target> - <copy file="${project.basedir}/target/flink-elasticsearch2-test_${scala.binary.version}-${project.version}.jar" tofile="${project.basedir}/target/Elasticsearch2SinkExample.jar" /> - </target> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>org.apache.flink.streaming.tests.Elasticsearch2SinkExample</mainClass> + </transformer> + </transformers> </configuration> </execution> </executions> http://git-wip-us.apache.org/repos/asf/flink/blob/3e434eeb/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java b/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java index 4ec03aa..f7532b1 100644 --- a/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java +++ b/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java @@ -44,9 +44,9 @@ public class Elasticsearch2SinkExample { final ParameterTool parameterTool = ParameterTool.fromArgs(args); - if (parameterTool.getNumberOfParameters() < 2) { + if (parameterTool.getNumberOfParameters() < 3) { System.out.println("Missing parameters!\n" + - "Usage: --index <index> --type <type>"); + "Usage: --numRecords --index <index> --type <type>"); return; } @@ -54,12 +54,13 @@ public class Elasticsearch2SinkExample { env.getConfig().disableSysoutLogging(); env.enableCheckpointing(5000); - DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() { - @Override - public String map(Long value) throws Exception { - return "message #" + value; - } - }); + DataStream<String> source = env.generateSequence(0, parameterTool.getInt("numRecords") - 1) + .map(new MapFunction<Long, String>() { + @Override + public String map(Long value) throws Exception { + return "message #" + value; + } + }); Map<String, String> userConfig = new HashMap<>(); userConfig.put("cluster.name", "elasticsearch"); http://git-wip-us.apache.org/repos/asf/flink/blob/3e434eeb/flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml b/flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml index 3a1e734..33241b0 100644 --- a/flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml +++ b/flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml @@ -20,15 +20,17 @@ under the License. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + <parent> - <artifactId>flink-end-to-end-tests</artifactId> <groupId>org.apache.flink</groupId> + <artifactId>flink-end-to-end-tests</artifactId> <version>1.5-SNAPSHOT</version> <relativePath>..</relativePath> </parent> - <modelVersion>4.0.0</modelVersion> - <artifactId>flink-elasticsearch5-test_${scala.binary.version}</artifactId> + <artifactId>flink-elasticsearch5-test</artifactId> <name>flink-elasticsearch5-test</name> <packaging>jar</packaging> @@ -39,45 +41,11 @@ under the License. <version>${project.version}</version> <scope>provided</scope> </dependency> - <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch5_${scala.binary.version}</artifactId> <version>${project.version}</version> </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <exclusions> - <!-- Remove elasticsearch1.7.1 --> - <exclusion> - <groupId>org.elasticsearch</groupId> - <artifactId>elasticsearch</artifactId> - </exclusion> - </exclusions> - </dependency> - - <!-- Dependency for Elasticsearch 5.x Java Client --> - <dependency> - <groupId>org.elasticsearch.client</groupId> - <artifactId>transport</artifactId> - <version>5.1.2</version> - </dependency> - - <!-- - Elasticsearch 5.x uses Log4j2 and no longer detects logging implementations, making - Log4j2 a strict dependency. The following is added so that the Log4j2 API in - Elasticsearch 5.x is routed to SLF4J. This way, user projects can remain flexible - in the logging implementation preferred. - --> - - <dependency> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-to-slf4j</artifactId> - <version>2.7</version> - </dependency> </dependencies> <build> @@ -87,26 +55,18 @@ under the License. <artifactId>maven-shade-plugin</artifactId> <version>3.0.0</version> <executions> - <!-- Elasticsearch5Sink end to end example --> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> - <minimizeJar>true</minimizeJar> + <finalName>Elasticsearch5SinkExample</finalName> <artifactSet> <excludes> <exclude>com.google.code.findbugs:jsr305</exclude> - <exclude>org.slf4j:*</exclude> - <exclude>log4j:*</exclude> </excludes> </artifactSet> - <transformers> - <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> - <mainClass>org.apache.flink.streaming.tests.Elasticsearch5SinkExample</mainClass> - </transformer> - </transformers> <filters> <filter> <artifact>*:*</artifact> @@ -117,27 +77,11 @@ under the License. </excludes> </filter> </filters> - </configuration> - </execution> - </executions> - </plugin> - - <!--simplify the name of the testing JAR for referring to it in test_streaming_elasticsearch5.sh scripts--> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-antrun-plugin</artifactId> - <version>1.7</version> - <executions> - <execution> - <id>rename</id> - <phase>package</phase> - <goals> - <goal>run</goal> - </goals> - <configuration> - <target> - <copy file="${project.basedir}/target/flink-elasticsearch5-test_${scala.binary.version}-${project.version}.jar" tofile="${project.basedir}/target/Elasticsearch5SinkExample.jar" /> - </target> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>org.apache.flink.streaming.tests.Elasticsearch5SinkExample</mainClass> + </transformer> + </transformers> </configuration> </execution> </executions> http://git-wip-us.apache.org/repos/asf/flink/blob/3e434eeb/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java b/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java index 285f902..39808f6 100644 --- a/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java +++ b/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java @@ -40,13 +40,14 @@ import java.util.Map; * End to end test for Elasticsearch5Sink. */ public class Elasticsearch5SinkExample { + public static void main(String[] args) throws Exception { final ParameterTool parameterTool = ParameterTool.fromArgs(args); - if (parameterTool.getNumberOfParameters() < 2) { + if (parameterTool.getNumberOfParameters() < 3) { System.out.println("Missing parameters!\n" + - "Usage: --index <index> --type <type>"); + "Usage: --numRecords <numRecords> --index <index> --type <type>"); return; } @@ -54,12 +55,13 @@ public class Elasticsearch5SinkExample { env.getConfig().disableSysoutLogging(); env.enableCheckpointing(5000); - DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() { - @Override - public String map(Long value) throws Exception { - return "message #" + value; - } - }); + DataStream<String> source = env.generateSequence(0, parameterTool.getInt("numRecords") - 1) + .map(new MapFunction<Long, String>() { + @Override + public String map(Long value) throws Exception { + return "message #" + value; + } + }); Map<String, String> userConfig = new HashMap<>(); userConfig.put("cluster.name", "elasticsearch"); http://git-wip-us.apache.org/repos/asf/flink/blob/3e434eeb/flink-end-to-end-tests/run-nightly-tests.sh ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh index 2898682..0ec3492 100755 --- a/flink-end-to-end-tests/run-nightly-tests.sh +++ b/flink-end-to-end-tests/run-nightly-tests.sh @@ -158,7 +158,28 @@ if [ $EXIT_CODE == 0 ]; then fi if [ $EXIT_CODE == 0 ]; then - run_test "stateful stream job upgrade end-to-end test" "$END_TO_END_DIR/test-scripts/test_stateful_stream_job_upgrade.sh 2 4" + run_test "Stateful stream job upgrade end-to-end test" "$END_TO_END_DIR/test-scripts/test_stateful_stream_job_upgrade.sh 2 4" + EXIT_CODE=$? +fi + +if [ $EXIT_CODE == 0 ]; then + run_test \ + "Elasticsearch (v1.7.1) sink end-to-end test" \ + "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 1 https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.1.tar.gz" + EXIT_CODE=$? +fi + +if [ $EXIT_CODE == 0 ]; then + run_test \ + "Elasticsearch (v2.3.5) sink end-to-end test" \ + "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 2 https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz" + EXIT_CODE=$? +fi + +if [ $EXIT_CODE == 0 ]; then + run_test \ + "Elasticsearch (v5.1.2) sink end-to-end test" \ + "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 5 https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.1.2.tar.gz" EXIT_CODE=$? fi http://git-wip-us.apache.org/repos/asf/flink/blob/3e434eeb/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh index 3fda344..0ef6d55 100644 --- a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh +++ b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh @@ -20,15 +20,32 @@ set -o pipefail if [[ -z $TEST_DATA_DIR ]]; then - echo "Must run common.sh before kafka-common.sh." + echo "Must run common.sh before elasticsearch-common.sh." exit 1 fi +function setup_elasticsearch { + mkdir -p $TEST_DATA_DIR + + local downloadUrl=$1 + + # start downloading Elasticsearch + echo "Downloading Elasticsearch from $downloadUrl ..." + curl "$downloadUrl" > $TEST_DATA_DIR/elasticsearch.tar.gz + + local elasticsearchDir=$TEST_DATA_DIR/elasticsearch + mkdir -p $elasticsearchDir + tar xzf $TEST_DATA_DIR/elasticsearch.tar.gz -C $elasticsearchDir --strip-components=1 + + # start Elasticsearch cluster + $elasticsearchDir/bin/elasticsearch & +} + function verify_elasticsearch_process_exist { - ELASTICSEARCH_PROCESS=$(jps | grep Elasticsearch | awk '{print $2}') + local elasticsearchProcess=$(jps | grep Elasticsearch | awk '{print $2}') # make sure the elasticsearch node is actually running - if [ "$ELASTICSEARCH_PROCESS" != "Elasticsearch" ]; then + if [ "$elasticsearchProcess" != "Elasticsearch" ]; then echo "Elasticsearch node is not running." PASS="" exit 1 @@ -38,25 +55,26 @@ function verify_elasticsearch_process_exist { } function verify_result { + local numRecords=$1 + if [ -f "$TEST_DATA_DIR/output" ]; then rm $TEST_DATA_DIR/output fi - curl 'localhost:9200/index/_search?q=*&pretty&size=21' > $TEST_DATA_DIR/output + while : ; do + curl 'localhost:9200/index/_search?q=*&pretty&size=21' > $TEST_DATA_DIR/output - if [ -n "$(grep '\"total\" : 21' $TEST_DATA_DIR/output)" ]; then - echo "Elasticsearch end to end test pass." - else - echo "Elasticsearch end to end test failed." - PASS="" - exit 1 - fi + if [ -n "$(grep "\"total\" : $numRecords" $TEST_DATA_DIR/output)" ]; then + echo "Elasticsearch end to end test pass." + break + else + echo "Waiting for Elasticsearch records ..." + sleep 1 + fi + done } function shutdown_elasticsearch_cluster { pid=$(jps | grep Elasticsearch | awk '{print $1}') - kill -SIGTERM $pid - - # make sure to run regular cleanup as well - cleanup + kill -9 $pid } http://git-wip-us.apache.org/repos/asf/flink/blob/3e434eeb/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh new file mode 100755 index 0000000..78ea283 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh @@ -0,0 +1,51 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +source "$(dirname "$0")"/common.sh +source "$(dirname "$0")"/elasticsearch-common.sh + +ELASTICSEARCH_VERSION=$1 +DOWNLOAD_URL=$2 + +mkdir -p $TEST_DATA_DIR + +setup_elasticsearch $DOWNLOAD_URL +verify_elasticsearch_process_exist + +start_cluster + +function test_cleanup { + shutdown_elasticsearch_cluster + + # make sure to run regular cleanup as well + cleanup +} + +trap test_cleanup INT +trap test_cleanup EXIT + +TEST_ES_JAR=$TEST_DATA_DIR/../../flink-elasticsearch${ELASTICSEARCH_VERSION}-test/target/Elasticsearch${ELASTICSEARCH_VERSION}SinkExample.jar + +# run the Flink job +$FLINK_DIR/bin/flink run -p 1 $TEST_ES_JAR \ + --numRecords 20 \ + --index index \ + --type type + +verify_result 20 http://git-wip-us.apache.org/repos/asf/flink/blob/3e434eeb/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch125.sh ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch125.sh b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch125.sh deleted file mode 100755 index dea3f13..0000000 --- a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch125.sh +++ /dev/null @@ -1,109 +0,0 @@ -#!/usr/bin/env bash -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -source "$(dirname "$0")"/common.sh -source "$(dirname "$0")"/elasticsearch-common.sh - -mkdir -p $TEST_DATA_DIR - -ELASTICSEARCH1_URL="https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.1.tar.gz" -ELASTICSEARCH2_URL="https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz" -ELASTICSEARCH5_URL="https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.1.2.tar.gz" - -# start downloading elasticsearch1 -echo "Downloading Elasticsearch1 from $ELASTICSEARCH1_URL" -curl "$ELASTICSEARCH1_URL" > $TEST_DATA_DIR/elasticsearch1.tar.gz - -tar xzf $TEST_DATA_DIR/elasticsearch1.tar.gz -C $TEST_DATA_DIR/ -ELASTICSEARCH1_DIR=$TEST_DATA_DIR/elasticsearch-1.7.1 - -# start elasticsearch1 cluster -$ELASTICSEARCH1_DIR/bin/elasticsearch -daemon - -verify_elasticsearch_process_exist - -start_cluster - -TEST_ES1_JAR=$TEST_DATA_DIR/../../flink-elasticsearch1-test/target/Elasticsearch1SinkExample.jar - -# run the Flink job -$FLINK_DIR/bin/flink run -p 1 $TEST_ES1_JAR \ - --index index \ - --type type - -verify_result - -shutdown_elasticsearch_cluster - -mkdir -p $TEST_DATA_DIR - -# start downloading elasticsearch2 -echo "Downloading Elasticsearch2 from $ELASTICSEARCH2_URL" -curl "$ELASTICSEARCH2_URL" > $TEST_DATA_DIR/elasticsearch2.tar.gz - -tar xzf $TEST_DATA_DIR/elasticsearch2.tar.gz -C $TEST_DATA_DIR/ -ELASTICSEARCH2_DIR=$TEST_DATA_DIR/elasticsearch-2.3.5 - -# start elasticsearch cluster, different from elasticsearch1 since using -daemon here will hang the shell. -nohup $ELASTICSEARCH2_DIR/bin/elasticsearch & - -verify_elasticsearch_process_exist - -start_cluster - -TEST_ES2_JAR=$TEST_DATA_DIR/../../flink-elasticsearch2-test/target/Elasticsearch2SinkExample.jar - -# run the Flink job -$FLINK_DIR/bin/flink run -p 1 $TEST_ES2_JAR \ - --index index \ - --type type - -verify_result - -shutdown_elasticsearch_cluster - -mkdir -p $TEST_DATA_DIR - -# start downloading elasticsearch5 -echo "Downloading Elasticsearch5 from $ELASTICSEARCH5_URL" -curl "$ELASTICSEARCH5_URL" > $TEST_DATA_DIR/elasticsearch5.tar.gz - -tar xzf $TEST_DATA_DIR/elasticsearch5.tar.gz -C $TEST_DATA_DIR/ -ELASTICSEARCH5_DIR=$TEST_DATA_DIR/elasticsearch-5.1.2 - -# start elasticsearch cluster, different from elasticsearch1 since using -daemon here will hang the shell. -nohup $ELASTICSEARCH5_DIR/bin/elasticsearch & - -verify_elasticsearch_process_exist - -start_cluster - -TEST_ES5_JAR=$TEST_DATA_DIR/../../flink-elasticsearch5-test/target/Elasticsearch5SinkExample.jar - -# run the Flink job -$FLINK_DIR/bin/flink run -p 1 $TEST_ES5_JAR \ - --index index \ - --type type - -verify_result - -rm -rf $FLINK_DIR/log/* 2> /dev/null - -trap shutdown_elasticsearch_cluster INT -trap shutdown_elasticsearch_cluster EXIT
