[FLINK-8989] [e2e] Cleanup / improve Elasticsearch e2e tests

- Rework e2e test job modules to have correct Maven POM
- Parameterize num of records to write to Elasticsearch
- Parameterize Elasticsearch download URL and version in test script
- Improve robustness of test
- Move more Elasticsearch functionality to elasticsearch-common.sh

This closes #5761.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/71095dcb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/71095dcb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/71095dcb

Branch: refs/heads/master
Commit: 71095dcb098c5b03a656a1f3bb48634294e537bb
Parents: d5bb60d
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Tue May 22 15:10:32 2018 +0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Tue May 22 16:00:12 2018 +0800

----------------------------------------------------------------------
 .../flink-elasticsearch1-test/pom.xml           |  45 ++------
 .../tests/Elasticsearch1SinkExample.java        |  18 +--
 .../flink-elasticsearch2-test/pom.xml           |  65 ++---------
 .../tests/Elasticsearch2SinkExample.java        |  17 +--
 .../flink-elasticsearch5-test/pom.xml           |  78 ++-----------
 .../tests/Elasticsearch5SinkExample.java        |  18 +--
 flink-end-to-end-tests/run-nightly-tests.sh     |  23 +++-
 .../test-scripts/elasticsearch-common.sh        |  48 +++++---
 .../test_streaming_elasticsearch.sh             |  51 +++++++++
 .../test_streaming_elasticsearch125.sh          | 109 -------------------
 10 files changed, 167 insertions(+), 305 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/71095dcb/flink-end-to-end-tests/flink-elasticsearch1-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-elasticsearch1-test/pom.xml 
b/flink-end-to-end-tests/flink-elasticsearch1-test/pom.xml
index b983e72..7c46ae1 100644
--- a/flink-end-to-end-tests/flink-elasticsearch1-test/pom.xml
+++ b/flink-end-to-end-tests/flink-elasticsearch1-test/pom.xml
@@ -21,16 +21,16 @@ under the License.
                 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
                 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
 
+       <modelVersion>4.0.0</modelVersion>
+
        <parent>
-               <artifactId>flink-end-to-end-tests</artifactId>
                <groupId>org.apache.flink</groupId>
+               <artifactId>flink-end-to-end-tests</artifactId>
                <version>1.6-SNAPSHOT</version>
                <relativePath>..</relativePath>
        </parent>
 
-       <modelVersion>4.0.0</modelVersion>
-
-       
<artifactId>flink-elasticsearch1-test_${scala.binary.version}</artifactId>
+       <artifactId>flink-elasticsearch1-test</artifactId>
        <name>flink-elasticsearch1-test</name>
        <packaging>jar</packaging>
 
@@ -41,7 +41,6 @@ under the License.
                        <version>${project.version}</version>
                        <scope>provided</scope>
                </dependency>
-
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        
<artifactId>flink-connector-elasticsearch_${scala.binary.version}</artifactId>
@@ -56,26 +55,18 @@ under the License.
                                <artifactId>maven-shade-plugin</artifactId>
                                <version>3.0.0</version>
                                <executions>
-                                       <!-- Elasticsearch1Sink end to end 
example -->
                                        <execution>
                                                <phase>package</phase>
                                                <goals>
                                                        <goal>shade</goal>
                                                </goals>
                                                <configuration>
-                                                       
<minimizeJar>true</minimizeJar>
+                                                       
<finalName>Elasticsearch1SinkExample</finalName>
                                                        <artifactSet>
                                                                <excludes>
                                                                        
<exclude>com.google.code.findbugs:jsr305</exclude>
-                                                                       
<exclude>org.slf4j:*</exclude>
-                                                                       
<exclude>log4j:*</exclude>
                                                                </excludes>
                                                        </artifactSet>
-                                                       <transformers>
-                                                               <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-                                                                       
<mainClass>org.apache.flink.streaming.tests.Elasticsearch1SinkExample</mainClass>
-                                                               </transformer>
-                                                       </transformers>
                                                        <filters>
                                                                <filter>
                                                                        
<artifact>*:*</artifact>
@@ -86,27 +77,11 @@ under the License.
                                                                        
</excludes>
                                                                </filter>
                                                        </filters>
-                                               </configuration>
-                                       </execution>
-                               </executions>
-                       </plugin>
-
-                       <!--simplify the name of the testing JAR for referring 
to it in test_streaming_elasticsearch1.sh scripts-->
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-antrun-plugin</artifactId>
-                               <version>1.7</version>
-                               <executions>
-                                       <execution>
-                                               <id>rename</id>
-                                               <phase>package</phase>
-                                               <goals>
-                                                       <goal>run</goal>
-                                               </goals>
-                                               <configuration>
-                                                       <target>
-                                                               <copy 
file="${project.basedir}/target/flink-elasticsearch1-test_${scala.binary.version}-${project.version}.jar"
 tofile="${project.basedir}/target/Elasticsearch1SinkExample.jar" />
-                                                       </target>
+                                                       <transformers>
+                                                               <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                                                       
<mainClass>org.apache.flink.streaming.tests.Elasticsearch1SinkExample</mainClass>
+                                                               </transformer>
+                                                       </transformers>
                                                </configuration>
                                        </execution>
                                </executions>

http://git-wip-us.apache.org/repos/asf/flink/blob/71095dcb/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java
 
b/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java
index bfdb806..18fa05a 100644
--- 
a/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java
+++ 
b/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java
@@ -41,13 +41,14 @@ import java.util.Map;
  * End to end test for Elasticsearch1Sink.
  */
 public class Elasticsearch1SinkExample {
+
        public static void main(String[] args) throws Exception {
 
                final ParameterTool parameterTool = 
ParameterTool.fromArgs(args);
 
-               if (parameterTool.getNumberOfParameters() < 2) {
+               if (parameterTool.getNumberOfParameters() < 3) {
                        System.out.println("Missing parameters!\n" +
-                               "Usage: --index <index> --type <type>");
+                               "Usage: --numRecords <numRecords> --index 
<index> --type <type>");
                        return;
                }
 
@@ -55,12 +56,13 @@ public class Elasticsearch1SinkExample {
                env.getConfig().disableSysoutLogging();
                env.enableCheckpointing(5000);
 
-               DataStream<String> source = env.generateSequence(0, 20).map(new 
MapFunction<Long, String>() {
-                       @Override
-                       public String map(Long value) throws Exception {
-                               return "message # " + value;
-                       }
-               });
+               DataStream<String> source = env.generateSequence(0, 
parameterTool.getInt("numRecords") - 1)
+                       .map(new MapFunction<Long, String>() {
+                               @Override
+                               public String map(Long value) throws Exception {
+                                       return "message # " + value;
+                               }
+                       });
 
                Map<String, String> userConfig = new HashMap<>();
                userConfig.put("cluster.name", "elasticsearch");

http://git-wip-us.apache.org/repos/asf/flink/blob/71095dcb/flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml 
b/flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml
index 178d632..4997910 100644
--- a/flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml
+++ b/flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml
@@ -20,16 +20,17 @@ under the License.
 <project xmlns="http://maven.apache.org/POM/4.0.0";
                 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
                 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
        <parent>
-               <artifactId>flink-end-to-end-tests</artifactId>
                <groupId>org.apache.flink</groupId>
+               <artifactId>flink-end-to-end-tests</artifactId>
                <version>1.6-SNAPSHOT</version>
                <relativePath>..</relativePath>
        </parent>
 
-       <modelVersion>4.0.0</modelVersion>
-
-       
<artifactId>flink-elasticsearch2-test_${scala.binary.version}</artifactId>
+       <artifactId>flink-elasticsearch2-test</artifactId>
        <name>flink-elasticsearch2-test</name>
        <packaging>jar</packaging>
 
@@ -40,31 +41,11 @@ under the License.
                        <version>${project.version}</version>
                        <scope>provided</scope>
                </dependency>
-
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        
<artifactId>flink-connector-elasticsearch2_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
                </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId>
-                       <version>${project.version}</version>
-                       <!-- Remove elasticsearch1.7.1 -->
-                       <exclusions>
-                               <exclusion>
-                                       <groupId>org.elasticsearch</groupId>
-                                       <artifactId>elasticsearch</artifactId>
-                               </exclusion>
-                       </exclusions>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.elasticsearch</groupId>
-                       <artifactId>elasticsearch</artifactId>
-                       <version>2.3.5</version>
-               </dependency>
        </dependencies>
 
        <build>
@@ -74,26 +55,18 @@ under the License.
                                <artifactId>maven-shade-plugin</artifactId>
                                <version>3.0.0</version>
                                <executions>
-                                       <!-- Elasticsearch2Sink end to end 
example -->
                                        <execution>
                                                <phase>package</phase>
                                                <goals>
                                                        <goal>shade</goal>
                                                </goals>
                                                <configuration>
-                                                       
<minimizeJar>true</minimizeJar>
+                                                       
<finalName>Elasticsearch2SinkExample</finalName>
                                                        <artifactSet>
                                                                <excludes>
                                                                        
<exclude>com.google.code.findbugs:jsr305</exclude>
-                                                                       
<exclude>org.slf4j:*</exclude>
-                                                                       
<exclude>log4j:*</exclude>
                                                                </excludes>
                                                        </artifactSet>
-                                                       <transformers>
-                                                               <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-                                                                       
<mainClass>org.apache.flink.streaming.tests.Elasticsearch2SinkExample</mainClass>
-                                                               </transformer>
-                                                       </transformers>
                                                        <filters>
                                                                <filter>
                                                                        
<artifact>*:*</artifact>
@@ -104,27 +77,11 @@ under the License.
                                                                        
</excludes>
                                                                </filter>
                                                        </filters>
-                                               </configuration>
-                                       </execution>
-                               </executions>
-                       </plugin>
-
-                       <!--simplify the name of the testing JAR for referring 
to it in test_streaming_elasticsearch2.sh scripts-->
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-antrun-plugin</artifactId>
-                               <version>1.7</version>
-                               <executions>
-                                       <execution>
-                                               <id>rename</id>
-                                               <phase>package</phase>
-                                               <goals>
-                                                       <goal>run</goal>
-                                               </goals>
-                                               <configuration>
-                                                       <target>
-                                                               <copy 
file="${project.basedir}/target/flink-elasticsearch2-test_${scala.binary.version}-${project.version}.jar"
 tofile="${project.basedir}/target/Elasticsearch2SinkExample.jar" />
-                                                       </target>
+                                                       <transformers>
+                                                               <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                                                       
<mainClass>org.apache.flink.streaming.tests.Elasticsearch2SinkExample</mainClass>
+                                                               </transformer>
+                                                       </transformers>
                                                </configuration>
                                        </execution>
                                </executions>

http://git-wip-us.apache.org/repos/asf/flink/blob/71095dcb/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java
 
b/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java
index 4ec03aa..f7532b1 100644
--- 
a/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java
+++ 
b/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java
@@ -44,9 +44,9 @@ public class Elasticsearch2SinkExample {
 
                final ParameterTool parameterTool = 
ParameterTool.fromArgs(args);
 
-               if (parameterTool.getNumberOfParameters() < 2) {
+               if (parameterTool.getNumberOfParameters() < 3) {
                        System.out.println("Missing parameters!\n" +
-                               "Usage: --index <index> --type <type>");
+                               "Usage: --numRecords --index <index> --type 
<type>");
                        return;
                }
 
@@ -54,12 +54,13 @@ public class Elasticsearch2SinkExample {
                env.getConfig().disableSysoutLogging();
                env.enableCheckpointing(5000);
 
-               DataStream<String> source = env.generateSequence(0, 20).map(new 
MapFunction<Long, String>() {
-                       @Override
-                       public String map(Long value) throws Exception {
-                               return "message #" + value;
-                       }
-               });
+               DataStream<String> source = env.generateSequence(0, 
parameterTool.getInt("numRecords") - 1)
+                       .map(new MapFunction<Long, String>() {
+                               @Override
+                               public String map(Long value) throws Exception {
+                                       return "message #" + value;
+                               }
+                       });
 
                Map<String, String> userConfig = new HashMap<>();
                userConfig.put("cluster.name", "elasticsearch");

http://git-wip-us.apache.org/repos/asf/flink/blob/71095dcb/flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml 
b/flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml
index 5b03a7f..05a621f 100644
--- a/flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml
+++ b/flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml
@@ -20,15 +20,17 @@ under the License.
 <project xmlns="http://maven.apache.org/POM/4.0.0";
                 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
                 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
        <parent>
-               <artifactId>flink-end-to-end-tests</artifactId>
                <groupId>org.apache.flink</groupId>
+               <artifactId>flink-end-to-end-tests</artifactId>
                <version>1.6-SNAPSHOT</version>
                <relativePath>..</relativePath>
        </parent>
-       <modelVersion>4.0.0</modelVersion>
 
-       
<artifactId>flink-elasticsearch5-test_${scala.binary.version}</artifactId>
+       <artifactId>flink-elasticsearch5-test</artifactId>
        <name>flink-elasticsearch5-test</name>
        <packaging>jar</packaging>
 
@@ -39,45 +41,11 @@ under the License.
                        <version>${project.version}</version>
                        <scope>provided</scope>
                </dependency>
-
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        
<artifactId>flink-connector-elasticsearch5_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
                </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId>
-                       <version>${project.version}</version>
-                       <exclusions>
-                               <!-- Remove elasticsearch1.7.1 -->
-                               <exclusion>
-                                       <groupId>org.elasticsearch</groupId>
-                                       <artifactId>elasticsearch</artifactId>
-                               </exclusion>
-                       </exclusions>
-               </dependency>
-
-               <!-- Dependency for Elasticsearch 5.x Java Client -->
-               <dependency>
-                       <groupId>org.elasticsearch.client</groupId>
-                       <artifactId>transport</artifactId>
-                       <version>5.1.2</version>
-               </dependency>
-
-               <!--
-                       Elasticsearch 5.x uses Log4j2 and no longer detects 
logging implementations, making
-                       Log4j2 a strict dependency. The following is added so 
that the Log4j2 API in
-                       Elasticsearch 5.x is routed to SLF4J. This way, user 
projects can remain flexible
-                       in the logging implementation preferred.
-               -->
-
-               <dependency>
-                       <groupId>org.apache.logging.log4j</groupId>
-                       <artifactId>log4j-to-slf4j</artifactId>
-                       <version>2.7</version>
-               </dependency>
        </dependencies>
 
        <build>
@@ -87,26 +55,18 @@ under the License.
                                <artifactId>maven-shade-plugin</artifactId>
                                <version>3.0.0</version>
                                <executions>
-                                       <!-- Elasticsearch5Sink end to end 
example -->
                                        <execution>
                                                <phase>package</phase>
                                                <goals>
                                                        <goal>shade</goal>
                                                </goals>
                                                <configuration>
-                                                       
<minimizeJar>true</minimizeJar>
+                                                       
<finalName>Elasticsearch5SinkExample</finalName>
                                                        <artifactSet>
                                                                <excludes>
                                                                        
<exclude>com.google.code.findbugs:jsr305</exclude>
-                                                                       
<exclude>org.slf4j:*</exclude>
-                                                                       
<exclude>log4j:*</exclude>
                                                                </excludes>
                                                        </artifactSet>
-                                                       <transformers>
-                                                               <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-                                                                       
<mainClass>org.apache.flink.streaming.tests.Elasticsearch5SinkExample</mainClass>
-                                                               </transformer>
-                                                       </transformers>
                                                        <filters>
                                                                <filter>
                                                                        
<artifact>*:*</artifact>
@@ -117,27 +77,11 @@ under the License.
                                                                        
</excludes>
                                                                </filter>
                                                        </filters>
-                                               </configuration>
-                                       </execution>
-                               </executions>
-                       </plugin>
-
-                       <!--simplify the name of the testing JAR for referring 
to it in test_streaming_elasticsearch5.sh scripts-->
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-antrun-plugin</artifactId>
-                               <version>1.7</version>
-                               <executions>
-                                       <execution>
-                                               <id>rename</id>
-                                               <phase>package</phase>
-                                               <goals>
-                                                       <goal>run</goal>
-                                               </goals>
-                                               <configuration>
-                                                       <target>
-                                                               <copy 
file="${project.basedir}/target/flink-elasticsearch5-test_${scala.binary.version}-${project.version}.jar"
 tofile="${project.basedir}/target/Elasticsearch5SinkExample.jar" />
-                                                       </target>
+                                                       <transformers>
+                                                               <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                                                       
<mainClass>org.apache.flink.streaming.tests.Elasticsearch5SinkExample</mainClass>
+                                                               </transformer>
+                                                       </transformers>
                                                </configuration>
                                        </execution>
                                </executions>

http://git-wip-us.apache.org/repos/asf/flink/blob/71095dcb/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java
 
b/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java
index 285f902..39808f6 100644
--- 
a/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java
+++ 
b/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java
@@ -40,13 +40,14 @@ import java.util.Map;
  * End to end test for Elasticsearch5Sink.
  */
 public class Elasticsearch5SinkExample {
+
        public static void main(String[] args) throws Exception {
 
                final ParameterTool parameterTool = 
ParameterTool.fromArgs(args);
 
-               if (parameterTool.getNumberOfParameters() < 2) {
+               if (parameterTool.getNumberOfParameters() < 3) {
                        System.out.println("Missing parameters!\n" +
-                               "Usage: --index <index> --type <type>");
+                               "Usage: --numRecords <numRecords> --index 
<index> --type <type>");
                        return;
                }
 
@@ -54,12 +55,13 @@ public class Elasticsearch5SinkExample {
                env.getConfig().disableSysoutLogging();
                env.enableCheckpointing(5000);
 
-               DataStream<String> source = env.generateSequence(0, 20).map(new 
MapFunction<Long, String>() {
-                       @Override
-                       public String map(Long value) throws Exception {
-                               return "message #" + value;
-                       }
-               });
+               DataStream<String> source = env.generateSequence(0, 
parameterTool.getInt("numRecords") - 1)
+                       .map(new MapFunction<Long, String>() {
+                               @Override
+                               public String map(Long value) throws Exception {
+                                       return "message #" + value;
+                               }
+                       });
 
                Map<String, String> userConfig = new HashMap<>();
                userConfig.put("cluster.name", "elasticsearch");

http://git-wip-us.apache.org/repos/asf/flink/blob/71095dcb/flink-end-to-end-tests/run-nightly-tests.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh 
b/flink-end-to-end-tests/run-nightly-tests.sh
index 2898682..0ec3492 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -158,7 +158,28 @@ if [ $EXIT_CODE == 0 ]; then
 fi
 
 if [ $EXIT_CODE == 0 ]; then
-  run_test "stateful stream job upgrade end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_stateful_stream_job_upgrade.sh 2 4"
+  run_test "Stateful stream job upgrade end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_stateful_stream_job_upgrade.sh 2 4"
+  EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+  run_test \
+    "Elasticsearch (v1.7.1) sink end-to-end test" \
+    "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 1 
https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.1.tar.gz";
+  EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+  run_test \
+    "Elasticsearch (v2.3.5) sink end-to-end test" \
+    "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 2 
https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz";
+  EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+  run_test \
+    "Elasticsearch (v5.1.2) sink end-to-end test" \
+    "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 5 
https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.1.2.tar.gz";
   EXIT_CODE=$?
 fi
 

http://git-wip-us.apache.org/repos/asf/flink/blob/71095dcb/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh 
b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
index 3fda344..0ef6d55 100644
--- a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
+++ b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
@@ -20,15 +20,32 @@
 set -o pipefail
 
 if [[ -z $TEST_DATA_DIR ]]; then
-  echo "Must run common.sh before kafka-common.sh."
+  echo "Must run common.sh before elasticsearch-common.sh."
   exit 1
 fi
 
+function setup_elasticsearch {
+    mkdir -p $TEST_DATA_DIR
+
+    local downloadUrl=$1
+
+    # start downloading Elasticsearch
+    echo "Downloading Elasticsearch from $downloadUrl ..."
+    curl "$downloadUrl" > $TEST_DATA_DIR/elasticsearch.tar.gz
+
+    local elasticsearchDir=$TEST_DATA_DIR/elasticsearch
+    mkdir -p $elasticsearchDir
+    tar xzf $TEST_DATA_DIR/elasticsearch.tar.gz -C $elasticsearchDir 
--strip-components=1
+
+    # start Elasticsearch cluster
+    $elasticsearchDir/bin/elasticsearch &
+}
+
 function verify_elasticsearch_process_exist {
-    ELASTICSEARCH_PROCESS=$(jps | grep Elasticsearch | awk '{print $2}')
+    local elasticsearchProcess=$(jps | grep Elasticsearch | awk '{print $2}')
 
     # make sure the elasticsearch node is actually running
-    if [ "$ELASTICSEARCH_PROCESS" != "Elasticsearch" ]; then
+    if [ "$elasticsearchProcess" != "Elasticsearch" ]; then
       echo "Elasticsearch node is not running."
       PASS=""
       exit 1
@@ -38,25 +55,26 @@ function verify_elasticsearch_process_exist {
 }
 
 function verify_result {
+    local numRecords=$1
+
     if [ -f "$TEST_DATA_DIR/output" ]; then
         rm $TEST_DATA_DIR/output
     fi
 
-    curl 'localhost:9200/index/_search?q=*&pretty&size=21' > 
$TEST_DATA_DIR/output
+    while : ; do
+      curl 'localhost:9200/index/_search?q=*&pretty&size=21' > 
$TEST_DATA_DIR/output
 
-    if [ -n "$(grep '\"total\" : 21' $TEST_DATA_DIR/output)" ]; then
-        echo "Elasticsearch end to end test pass."
-    else
-        echo "Elasticsearch end to end test failed."
-        PASS=""
-        exit 1
-    fi
+      if [ -n "$(grep "\"total\" : $numRecords" $TEST_DATA_DIR/output)" ]; then
+          echo "Elasticsearch end to end test pass."
+          break
+      else
+          echo "Waiting for Elasticsearch records ..."
+          sleep 1
+      fi
+    done
 }
 
 function shutdown_elasticsearch_cluster {
    pid=$(jps | grep Elasticsearch | awk '{print $1}')
-   kill -SIGTERM $pid
-
-   # make sure to run regular cleanup as well
-   cleanup
+   kill -9 $pid
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/71095dcb/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh 
b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
new file mode 100755
index 0000000..78ea283
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
@@ -0,0 +1,51 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/elasticsearch-common.sh
+
+ELASTICSEARCH_VERSION=$1
+DOWNLOAD_URL=$2
+
+mkdir -p $TEST_DATA_DIR
+
+setup_elasticsearch $DOWNLOAD_URL
+verify_elasticsearch_process_exist
+
+start_cluster
+
+function test_cleanup {
+  shutdown_elasticsearch_cluster
+
+  # make sure to run regular cleanup as well
+   cleanup
+}
+
+trap test_cleanup INT
+trap test_cleanup EXIT
+
+TEST_ES_JAR=$TEST_DATA_DIR/../../flink-elasticsearch${ELASTICSEARCH_VERSION}-test/target/Elasticsearch${ELASTICSEARCH_VERSION}SinkExample.jar
+
+# run the Flink job
+$FLINK_DIR/bin/flink run -p 1 $TEST_ES_JAR \
+  --numRecords 20 \
+  --index index \
+  --type type
+
+verify_result 20

http://git-wip-us.apache.org/repos/asf/flink/blob/71095dcb/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch125.sh
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch125.sh 
b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch125.sh
deleted file mode 100755
index dea3f13..0000000
--- a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch125.sh
+++ /dev/null
@@ -1,109 +0,0 @@
-#!/usr/bin/env bash
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-source "$(dirname "$0")"/common.sh
-source "$(dirname "$0")"/elasticsearch-common.sh
-
-mkdir -p $TEST_DATA_DIR
-
-ELASTICSEARCH1_URL="https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.1.tar.gz";
-ELASTICSEARCH2_URL="https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz";
-ELASTICSEARCH5_URL="https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.1.2.tar.gz";
-
-# start downloading elasticsearch1
-echo "Downloading Elasticsearch1 from $ELASTICSEARCH1_URL"
-curl "$ELASTICSEARCH1_URL" > $TEST_DATA_DIR/elasticsearch1.tar.gz
-
-tar xzf $TEST_DATA_DIR/elasticsearch1.tar.gz -C $TEST_DATA_DIR/
-ELASTICSEARCH1_DIR=$TEST_DATA_DIR/elasticsearch-1.7.1
-
-# start elasticsearch1 cluster
-$ELASTICSEARCH1_DIR/bin/elasticsearch -daemon
-
-verify_elasticsearch_process_exist
-
-start_cluster
-
-TEST_ES1_JAR=$TEST_DATA_DIR/../../flink-elasticsearch1-test/target/Elasticsearch1SinkExample.jar
-
-# run the Flink job
-$FLINK_DIR/bin/flink run -p 1 $TEST_ES1_JAR \
-  --index index \
-  --type type
-
-verify_result
-
-shutdown_elasticsearch_cluster
-
-mkdir -p $TEST_DATA_DIR
-
-# start downloading elasticsearch2
-echo "Downloading Elasticsearch2 from $ELASTICSEARCH2_URL"
-curl "$ELASTICSEARCH2_URL" > $TEST_DATA_DIR/elasticsearch2.tar.gz
-
-tar xzf $TEST_DATA_DIR/elasticsearch2.tar.gz -C $TEST_DATA_DIR/
-ELASTICSEARCH2_DIR=$TEST_DATA_DIR/elasticsearch-2.3.5
-
-# start elasticsearch cluster, different from elasticsearch1 since using 
-daemon here will hang the shell.
-nohup $ELASTICSEARCH2_DIR/bin/elasticsearch &
-
-verify_elasticsearch_process_exist
-
-start_cluster
-
-TEST_ES2_JAR=$TEST_DATA_DIR/../../flink-elasticsearch2-test/target/Elasticsearch2SinkExample.jar
-
-# run the Flink job
-$FLINK_DIR/bin/flink run -p 1 $TEST_ES2_JAR \
-  --index index \
-  --type type
-
-verify_result
-
-shutdown_elasticsearch_cluster
-
-mkdir -p $TEST_DATA_DIR
-
-# start downloading elasticsearch5
-echo "Downloading Elasticsearch5 from $ELASTICSEARCH5_URL"
-curl "$ELASTICSEARCH5_URL" > $TEST_DATA_DIR/elasticsearch5.tar.gz
-
-tar xzf $TEST_DATA_DIR/elasticsearch5.tar.gz -C $TEST_DATA_DIR/
-ELASTICSEARCH5_DIR=$TEST_DATA_DIR/elasticsearch-5.1.2
-
-# start elasticsearch cluster, different from elasticsearch1 since using 
-daemon here will hang the shell.
-nohup $ELASTICSEARCH5_DIR/bin/elasticsearch &
-
-verify_elasticsearch_process_exist
-
-start_cluster
-
-TEST_ES5_JAR=$TEST_DATA_DIR/../../flink-elasticsearch5-test/target/Elasticsearch5SinkExample.jar
-
-# run the Flink job
-$FLINK_DIR/bin/flink run -p 1 $TEST_ES5_JAR \
-  --index index \
-  --type type
-
-verify_result
-
-rm -rf $FLINK_DIR/log/* 2> /dev/null
-
-trap shutdown_elasticsearch_cluster INT
-trap shutdown_elasticsearch_cluster EXIT

Reply via email to