Repository: flink
Updated Branches:
  refs/heads/master f41b00e22 -> 9b7493d14


[FLINK-8989] [e2eTests] Elasticsearch1&2&5 end to end test


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d5bb60de
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d5bb60de
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d5bb60de

Branch: refs/heads/master
Commit: d5bb60de938103d535e0189d6117859f0699ee49
Parents: 97a6638
Author: zhangminglei <[email protected]>
Authored: Tue May 22 09:23:13 2018 +0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Tue May 22 15:50:54 2018 +0800

----------------------------------------------------------------------
 .../examples/ElasticsearchSinkExample.java      |  85 -----------
 .../examples/ElasticsearchSinkExample.java      |  81 ----------
 .../examples/ElasticsearchSinkExample.java      |  83 -----------
 .../flink-elasticsearch1-test/pom.xml           | 117 +++++++++++++++
 .../tests/Elasticsearch1SinkExample.java        |  93 ++++++++++++
 .../flink-elasticsearch2-test/pom.xml           | 135 +++++++++++++++++
 .../tests/Elasticsearch2SinkExample.java        |  92 ++++++++++++
 .../flink-elasticsearch5-test/pom.xml           | 148 +++++++++++++++++++
 .../tests/Elasticsearch5SinkExample.java        |  92 ++++++++++++
 flink-end-to-end-tests/pom.xml                  |   3 +
 .../test-scripts/elasticsearch-common.sh        |  62 ++++++++
 .../test_streaming_elasticsearch125.sh          | 109 ++++++++++++++
 12 files changed, 851 insertions(+), 249 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d5bb60de/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java
 
b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java
deleted file mode 100644
index 8a0321d..0000000
--- 
a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.elasticsearch.examples;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
-import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
-import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
-
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.elasticsearch.common.transport.TransportAddress;
-
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This example shows how to use the Elasticsearch Sink. Before running it you 
must ensure that
- * you have a cluster named "elasticsearch" running or change the cluster name 
in the config map.
- */
-@SuppressWarnings("serial")
-public class ElasticsearchSinkExample {
-
-       public static void main(String[] args) throws Exception {
-
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-               DataStream<String> source = env.generateSequence(0, 20).map(new 
MapFunction<Long, String>() {
-                       @Override
-                       public String map(Long value) throws Exception {
-                               return "message #" + value;
-                       }
-               });
-
-               Map<String, String> userConfig = new HashMap<>();
-               userConfig.put("cluster.name", "elasticsearch");
-               // This instructs the sink to emit after every element, 
otherwise they would be buffered
-               
userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-
-               List<TransportAddress> transports = new ArrayList<>();
-               transports.add(new 
InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
-
-               source.addSink(new ElasticsearchSink<>(userConfig, transports, 
new ElasticsearchSinkFunction<String>() {
-                       @Override
-                       public void process(String element, RuntimeContext ctx, 
RequestIndexer indexer) {
-                               indexer.add(createIndexRequest(element));
-                       }
-               }));
-
-               env.execute("Elasticsearch Sink Example");
-       }
-
-       private static IndexRequest createIndexRequest(String element) {
-               Map<String, Object> json = new HashMap<>();
-               json.put("data", element);
-
-               return Requests.indexRequest()
-                       .index("my-index")
-                       .type("my-type")
-                       .id(element)
-                       .source(json);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d5bb60de/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchSinkExample.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchSinkExample.java
 
b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchSinkExample.java
deleted file mode 100644
index c963927..0000000
--- 
a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchSinkExample.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.elasticsearch2.examples;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink;
-
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Requests;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This example shows how to use the Elasticsearch Sink. Before running it you 
must ensure that
- * you have a cluster named "elasticsearch" running or change the name of 
cluster in the config map.
- */
-public class ElasticsearchSinkExample {
-
-       public static void main(String[] args) throws Exception {
-
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-               DataStream<String> source = env.generateSequence(0, 20).map(new 
MapFunction<Long, String>() {
-                       @Override
-                       public String map(Long value) throws Exception {
-                               return "message #" + value;
-                       }
-               });
-
-               Map<String, String> userConfig = new HashMap<>();
-               userConfig.put("cluster.name", "elasticsearch");
-               // This instructs the sink to emit after every element, 
otherwise they would be buffered
-               
userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-
-               List<InetSocketAddress> transports = new ArrayList<>();
-               transports.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-
-               source.addSink(new ElasticsearchSink<>(userConfig, transports, 
new 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction<String>(){
-                       @Override
-                       public void process(String element, RuntimeContext ctx, 
org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer indexer) {
-                               indexer.add(createIndexRequest(element));
-                       }
-               }));
-
-               env.execute("Elasticsearch Sink Example");
-       }
-
-       private static IndexRequest createIndexRequest(String element) {
-               Map<String, Object> json = new HashMap<>();
-               json.put("data", element);
-
-               return Requests.indexRequest()
-                       .index("my-index")
-                       .type("my-type")
-                       .id(element)
-                       .source(json);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d5bb60de/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchSinkExample.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchSinkExample.java
 
b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchSinkExample.java
deleted file mode 100644
index 22c1053..0000000
--- 
a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchSinkExample.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.elasticsearch5.examples;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
-import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
-import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;
-
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Requests;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This example shows how to use the Elasticsearch Sink. Before running it you 
must ensure that
- * you have a cluster named "elasticsearch" running or change the name of 
cluster in the config map.
- */
-public class ElasticsearchSinkExample {
-
-       public static void main(String[] args) throws Exception {
-
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-               DataStream<String> source = env.generateSequence(0, 20).map(new 
MapFunction<Long, String>() {
-                       @Override
-                       public String map(Long value) throws Exception {
-                               return "message #" + value;
-                       }
-               });
-
-               Map<String, String> userConfig = new HashMap<>();
-               userConfig.put("cluster.name", "elasticsearch");
-               // This instructs the sink to emit after every element, 
otherwise they would be buffered
-               
userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-
-               List<InetSocketAddress> transports = new ArrayList<>();
-               transports.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-
-               source.addSink(new ElasticsearchSink<>(userConfig, transports, 
new ElasticsearchSinkFunction<String>() {
-                       @Override
-                       public void process(String element, RuntimeContext ctx, 
RequestIndexer indexer) {
-                               indexer.add(createIndexRequest(element));
-                       }
-               }));
-
-               env.execute("Elasticsearch Sink Example");
-       }
-
-       private static IndexRequest createIndexRequest(String element) {
-               Map<String, Object> json = new HashMap<>();
-               json.put("data", element);
-
-               return Requests.indexRequest()
-                       .index("my-index")
-                       .type("my-type")
-                       .id(element)
-                       .source(json);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d5bb60de/flink-end-to-end-tests/flink-elasticsearch1-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-elasticsearch1-test/pom.xml 
b/flink-end-to-end-tests/flink-elasticsearch1-test/pom.xml
new file mode 100644
index 0000000..b983e72
--- /dev/null
+++ b/flink-end-to-end-tests/flink-elasticsearch1-test/pom.xml
@@ -0,0 +1,117 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+       <parent>
+               <artifactId>flink-end-to-end-tests</artifactId>
+               <groupId>org.apache.flink</groupId>
+               <version>1.6-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       
<artifactId>flink-elasticsearch1-test_${scala.binary.version}</artifactId>
+       <name>flink-elasticsearch1-test</name>
+       <packaging>jar</packaging>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-connector-elasticsearch_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-shade-plugin</artifactId>
+                               <version>3.0.0</version>
+                               <executions>
+                                       <!-- Elasticsearch1Sink end to end 
example -->
+                                       <execution>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>shade</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<minimizeJar>true</minimizeJar>
+                                                       <artifactSet>
+                                                               <excludes>
+                                                                       
<exclude>com.google.code.findbugs:jsr305</exclude>
+                                                                       
<exclude>org.slf4j:*</exclude>
+                                                                       
<exclude>log4j:*</exclude>
+                                                               </excludes>
+                                                       </artifactSet>
+                                                       <transformers>
+                                                               <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                                                       
<mainClass>org.apache.flink.streaming.tests.Elasticsearch1SinkExample</mainClass>
+                                                               </transformer>
+                                                       </transformers>
+                                                       <filters>
+                                                               <filter>
+                                                                       
<artifact>*:*</artifact>
+                                                                       
<excludes>
+                                                                               
<exclude>META-INF/*.SF</exclude>
+                                                                               
<exclude>META-INF/*.DSA</exclude>
+                                                                               
<exclude>META-INF/*.RSA</exclude>
+                                                                       
</excludes>
+                                                               </filter>
+                                                       </filters>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+
+                       <!--simplify the name of the testing JAR for referring 
to it in test_streaming_elasticsearch1.sh scripts-->
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-antrun-plugin</artifactId>
+                               <version>1.7</version>
+                               <executions>
+                                       <execution>
+                                               <id>rename</id>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>run</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <target>
+                                                               <copy 
file="${project.basedir}/target/flink-elasticsearch1-test_${scala.binary.version}-${project.version}.jar"
 tofile="${project.basedir}/target/Elasticsearch1SinkExample.jar" />
+                                                       </target>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/d5bb60de/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java
 
b/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java
new file mode 100644
index 0000000..bfdb806
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.transport.TransportAddress;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * End to end test for Elasticsearch1Sink.
+ */
+public class Elasticsearch1SinkExample {
+       public static void main(String[] args) throws Exception {
+
+               final ParameterTool parameterTool = 
ParameterTool.fromArgs(args);
+
+               if (parameterTool.getNumberOfParameters() < 2) {
+                       System.out.println("Missing parameters!\n" +
+                               "Usage: --index <index> --type <type>");
+                       return;
+               }
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.getConfig().disableSysoutLogging();
+               env.enableCheckpointing(5000);
+
+               DataStream<String> source = env.generateSequence(0, 20).map(new 
MapFunction<Long, String>() {
+                       @Override
+                       public String map(Long value) throws Exception {
+                               return "message # " + value;
+                       }
+               });
+
+               Map<String, String> userConfig = new HashMap<>();
+               userConfig.put("cluster.name", "elasticsearch");
+               // This instructs the sink to emit after every element, 
otherwise they would be buffered
+               
userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+               List<TransportAddress> transports = new ArrayList<>();
+               transports.add(new 
InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
+
+               source.addSink(new ElasticsearchSink<>(userConfig, transports, 
new ElasticsearchSinkFunction<String>() {
+                       @Override
+                       public void process(String element, RuntimeContext ctx, 
RequestIndexer indexer) {
+                               indexer.add(createIndexRequest(element, 
parameterTool));
+                       }
+               }));
+
+               env.execute("Elasticsearch1.x end to end sink test example");
+       }
+
+       private static IndexRequest createIndexRequest(String element, 
ParameterTool parameterTool) {
+               Map<String, Object> json = new HashMap<>();
+               json.put("data", element);
+
+               return Requests.indexRequest()
+                       .index(parameterTool.getRequired("index"))
+                       .type(parameterTool.getRequired("type"))
+                       .id(element)
+                       .source(json);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d5bb60de/flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml 
b/flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml
new file mode 100644
index 0000000..178d632
--- /dev/null
+++ b/flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml
@@ -0,0 +1,135 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <parent>
+               <artifactId>flink-end-to-end-tests</artifactId>
+               <groupId>org.apache.flink</groupId>
+               <version>1.6-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       
<artifactId>flink-elasticsearch2-test_${scala.binary.version}</artifactId>
+       <name>flink-elasticsearch2-test</name>
+       <packaging>jar</packaging>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-connector-elasticsearch2_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <!-- Remove elasticsearch1.7.1 -->
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>org.elasticsearch</groupId>
+                                       <artifactId>elasticsearch</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.elasticsearch</groupId>
+                       <artifactId>elasticsearch</artifactId>
+                       <version>2.3.5</version>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-shade-plugin</artifactId>
+                               <version>3.0.0</version>
+                               <executions>
+                                       <!-- Elasticsearch2Sink end to end 
example -->
+                                       <execution>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>shade</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<minimizeJar>true</minimizeJar>
+                                                       <artifactSet>
+                                                               <excludes>
+                                                                       
<exclude>com.google.code.findbugs:jsr305</exclude>
+                                                                       
<exclude>org.slf4j:*</exclude>
+                                                                       
<exclude>log4j:*</exclude>
+                                                               </excludes>
+                                                       </artifactSet>
+                                                       <transformers>
+                                                               <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                                                       
<mainClass>org.apache.flink.streaming.tests.Elasticsearch2SinkExample</mainClass>
+                                                               </transformer>
+                                                       </transformers>
+                                                       <filters>
+                                                               <filter>
+                                                                       
<artifact>*:*</artifact>
+                                                                       
<excludes>
+                                                                               
<exclude>META-INF/*.SF</exclude>
+                                                                               
<exclude>META-INF/*.DSA</exclude>
+                                                                               
<exclude>META-INF/*.RSA</exclude>
+                                                                       
</excludes>
+                                                               </filter>
+                                                       </filters>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+
+                       <!--simplify the name of the testing JAR for referring 
to it in test_streaming_elasticsearch2.sh scripts-->
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-antrun-plugin</artifactId>
+                               <version>1.7</version>
+                               <executions>
+                                       <execution>
+                                               <id>rename</id>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>run</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <target>
+                                                               <copy 
file="${project.basedir}/target/flink-elasticsearch2-test_${scala.binary.version}-${project.version}.jar"
 tofile="${project.basedir}/target/Elasticsearch2SinkExample.jar" />
+                                                       </target>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/d5bb60de/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java
 
b/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java
new file mode 100644
index 0000000..4ec03aa
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink;
+
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * End to end test for Elasticsearch2Sink.
+ */
+public class Elasticsearch2SinkExample {
+
+       public static void main(String[] args) throws Exception {
+
+               final ParameterTool parameterTool = 
ParameterTool.fromArgs(args);
+
+               if (parameterTool.getNumberOfParameters() < 2) {
+                       System.out.println("Missing parameters!\n" +
+                               "Usage: --index <index> --type <type>");
+                       return;
+               }
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.getConfig().disableSysoutLogging();
+               env.enableCheckpointing(5000);
+
+               DataStream<String> source = env.generateSequence(0, 20).map(new 
MapFunction<Long, String>() {
+                       @Override
+                       public String map(Long value) throws Exception {
+                               return "message #" + value;
+                       }
+               });
+
+               Map<String, String> userConfig = new HashMap<>();
+               userConfig.put("cluster.name", "elasticsearch");
+               // This instructs the sink to emit after every element, 
otherwise they would be buffered
+               
userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+               List<InetSocketAddress> transports = new ArrayList<>();
+               transports.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+
+               source.addSink(new ElasticsearchSink<>(userConfig, transports, 
new ElasticsearchSinkFunction<String>(){
+                       @Override
+                       public void process(String element, RuntimeContext ctx, 
org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer indexer) {
+                               indexer.add(createIndexRequest(element, 
parameterTool));
+                       }
+               }));
+
+               env.execute("Elasticsearch2.x end to end sink test example");
+       }
+
+       private static IndexRequest createIndexRequest(String element, 
ParameterTool parameterTool) {
+               Map<String, Object> json = new HashMap<>();
+               json.put("data", element);
+
+               return Requests.indexRequest()
+                       .index(parameterTool.getRequired("index"))
+                       .type(parameterTool.getRequired("type"))
+                       .id(element)
+                       .source(json);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d5bb60de/flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml 
b/flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml
new file mode 100644
index 0000000..5b03a7f
--- /dev/null
+++ b/flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml
@@ -0,0 +1,148 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <parent>
+               <artifactId>flink-end-to-end-tests</artifactId>
+               <groupId>org.apache.flink</groupId>
+               <version>1.6-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+       <modelVersion>4.0.0</modelVersion>
+
+       
<artifactId>flink-elasticsearch5-test_${scala.binary.version}</artifactId>
+       <name>flink-elasticsearch5-test</name>
+       <packaging>jar</packaging>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-connector-elasticsearch5_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <exclusions>
+                               <!-- Remove elasticsearch1.7.1 -->
+                               <exclusion>
+                                       <groupId>org.elasticsearch</groupId>
+                                       <artifactId>elasticsearch</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+               <!-- Dependency for Elasticsearch 5.x Java Client -->
+               <dependency>
+                       <groupId>org.elasticsearch.client</groupId>
+                       <artifactId>transport</artifactId>
+                       <version>5.1.2</version>
+               </dependency>
+
+               <!--
+                       Elasticsearch 5.x uses Log4j2 and no longer detects 
logging implementations, making
+                       Log4j2 a strict dependency. The following is added so 
that the Log4j2 API in
+                       Elasticsearch 5.x is routed to SLF4J. This way, user 
projects can remain flexible
+                       in the logging implementation preferred.
+               -->
+
+               <dependency>
+                       <groupId>org.apache.logging.log4j</groupId>
+                       <artifactId>log4j-to-slf4j</artifactId>
+                       <version>2.7</version>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-shade-plugin</artifactId>
+                               <version>3.0.0</version>
+                               <executions>
+                                       <!-- Elasticsearch5Sink end to end 
example -->
+                                       <execution>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>shade</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<minimizeJar>true</minimizeJar>
+                                                       <artifactSet>
+                                                               <excludes>
+                                                                       
<exclude>com.google.code.findbugs:jsr305</exclude>
+                                                                       
<exclude>org.slf4j:*</exclude>
+                                                                       
<exclude>log4j:*</exclude>
+                                                               </excludes>
+                                                       </artifactSet>
+                                                       <transformers>
+                                                               <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                                                       
<mainClass>org.apache.flink.streaming.tests.Elasticsearch5SinkExample</mainClass>
+                                                               </transformer>
+                                                       </transformers>
+                                                       <filters>
+                                                               <filter>
+                                                                       
<artifact>*:*</artifact>
+                                                                       
<excludes>
+                                                                               
<exclude>META-INF/*.SF</exclude>
+                                                                               
<exclude>META-INF/*.DSA</exclude>
+                                                                               
<exclude>META-INF/*.RSA</exclude>
+                                                                       
</excludes>
+                                                               </filter>
+                                                       </filters>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+
+                       <!--simplify the name of the testing JAR for referring 
to it in test_streaming_elasticsearch5.sh scripts-->
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-antrun-plugin</artifactId>
+                               <version>1.7</version>
+                               <executions>
+                                       <execution>
+                                               <id>rename</id>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>run</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <target>
+                                                               <copy 
file="${project.basedir}/target/flink-elasticsearch5-test_${scala.binary.version}-${project.version}.jar"
 tofile="${project.basedir}/target/Elasticsearch5SinkExample.jar" />
+                                                       </target>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/d5bb60de/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java
 
b/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java
new file mode 100644
index 0000000..285f902
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;
+
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * End to end test for Elasticsearch5Sink.
+ */
+public class Elasticsearch5SinkExample {
+       public static void main(String[] args) throws Exception {
+
+               final ParameterTool parameterTool = 
ParameterTool.fromArgs(args);
+
+               if (parameterTool.getNumberOfParameters() < 2) {
+                       System.out.println("Missing parameters!\n" +
+                               "Usage: --index <index> --type <type>");
+                       return;
+               }
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.getConfig().disableSysoutLogging();
+               env.enableCheckpointing(5000);
+
+               DataStream<String> source = env.generateSequence(0, 20).map(new 
MapFunction<Long, String>() {
+                       @Override
+                       public String map(Long value) throws Exception {
+                               return "message #" + value;
+                       }
+               });
+
+               Map<String, String> userConfig = new HashMap<>();
+               userConfig.put("cluster.name", "elasticsearch");
+               // This instructs the sink to emit after every element, 
otherwise they would be buffered
+               
userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+               List<InetSocketAddress> transports = new ArrayList<>();
+               transports.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+
+               source.addSink(new ElasticsearchSink<>(userConfig, transports, 
new ElasticsearchSinkFunction<String>() {
+                       @Override
+                       public void process(String element, RuntimeContext ctx, 
RequestIndexer indexer) {
+                               indexer.add(createIndexRequest(element, 
parameterTool));
+                       }
+               }));
+
+               env.execute("Elasticsearch5.x end to end sink test example");
+       }
+
+       private static IndexRequest createIndexRequest(String element, 
ParameterTool parameterTool) {
+               Map<String, Object> json = new HashMap<>();
+               json.put("data", element);
+
+               return Requests.indexRequest()
+                       .index(parameterTool.getRequired("index"))
+                       .type(parameterTool.getRequired("type"))
+                       .id(element)
+                       .source(json);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d5bb60de/flink-end-to-end-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 367e120..581abc8 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -44,6 +44,9 @@ under the License.
                <module>flink-high-parallelism-iterations-test</module>
                <module>flink-stream-stateful-job-upgrade-test</module>
                <module>flink-local-recovery-and-allocation-test</module>
+               <module>flink-elasticsearch1-test</module>
+               <module>flink-elasticsearch2-test</module>
+               <module>flink-elasticsearch5-test</module>
        </modules>
 
        <build>

http://git-wip-us.apache.org/repos/asf/flink/blob/d5bb60de/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh 
b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
new file mode 100644
index 0000000..3fda344
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
@@ -0,0 +1,62 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+set -o pipefail
+
+if [[ -z $TEST_DATA_DIR ]]; then
+  echo "Must run common.sh before kafka-common.sh."
+  exit 1
+fi
+
+function verify_elasticsearch_process_exist {
+    ELASTICSEARCH_PROCESS=$(jps | grep Elasticsearch | awk '{print $2}')
+
+    # make sure the elasticsearch node is actually running
+    if [ "$ELASTICSEARCH_PROCESS" != "Elasticsearch" ]; then
+      echo "Elasticsearch node is not running."
+      PASS=""
+      exit 1
+    else
+      echo "Elasticsearch node is running."
+    fi
+}
+
+function verify_result {
+    if [ -f "$TEST_DATA_DIR/output" ]; then
+        rm $TEST_DATA_DIR/output
+    fi
+
+    curl 'localhost:9200/index/_search?q=*&pretty&size=21' > 
$TEST_DATA_DIR/output
+
+    if [ -n "$(grep '\"total\" : 21' $TEST_DATA_DIR/output)" ]; then
+        echo "Elasticsearch end to end test pass."
+    else
+        echo "Elasticsearch end to end test failed."
+        PASS=""
+        exit 1
+    fi
+}
+
+function shutdown_elasticsearch_cluster {
+   pid=$(jps | grep Elasticsearch | awk '{print $1}')
+   kill -SIGTERM $pid
+
+   # make sure to run regular cleanup as well
+   cleanup
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d5bb60de/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch125.sh
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch125.sh 
b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch125.sh
new file mode 100755
index 0000000..dea3f13
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch125.sh
@@ -0,0 +1,109 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/elasticsearch-common.sh
+
+mkdir -p $TEST_DATA_DIR
+
+ELASTICSEARCH1_URL="https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.1.tar.gz";
+ELASTICSEARCH2_URL="https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz";
+ELASTICSEARCH5_URL="https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.1.2.tar.gz";
+
+# start downloading elasticsearch1
+echo "Downloading Elasticsearch1 from $ELASTICSEARCH1_URL"
+curl "$ELASTICSEARCH1_URL" > $TEST_DATA_DIR/elasticsearch1.tar.gz
+
+tar xzf $TEST_DATA_DIR/elasticsearch1.tar.gz -C $TEST_DATA_DIR/
+ELASTICSEARCH1_DIR=$TEST_DATA_DIR/elasticsearch-1.7.1
+
+# start elasticsearch1 cluster
+$ELASTICSEARCH1_DIR/bin/elasticsearch -daemon
+
+verify_elasticsearch_process_exist
+
+start_cluster
+
+TEST_ES1_JAR=$TEST_DATA_DIR/../../flink-elasticsearch1-test/target/Elasticsearch1SinkExample.jar
+
+# run the Flink job
+$FLINK_DIR/bin/flink run -p 1 $TEST_ES1_JAR \
+  --index index \
+  --type type
+
+verify_result
+
+shutdown_elasticsearch_cluster
+
+mkdir -p $TEST_DATA_DIR
+
+# start downloading elasticsearch2
+echo "Downloading Elasticsearch2 from $ELASTICSEARCH2_URL"
+curl "$ELASTICSEARCH2_URL" > $TEST_DATA_DIR/elasticsearch2.tar.gz
+
+tar xzf $TEST_DATA_DIR/elasticsearch2.tar.gz -C $TEST_DATA_DIR/
+ELASTICSEARCH2_DIR=$TEST_DATA_DIR/elasticsearch-2.3.5
+
+# start elasticsearch cluster, different from elasticsearch1 since using 
-daemon here will hang the shell.
+nohup $ELASTICSEARCH2_DIR/bin/elasticsearch &
+
+verify_elasticsearch_process_exist
+
+start_cluster
+
+TEST_ES2_JAR=$TEST_DATA_DIR/../../flink-elasticsearch2-test/target/Elasticsearch2SinkExample.jar
+
+# run the Flink job
+$FLINK_DIR/bin/flink run -p 1 $TEST_ES2_JAR \
+  --index index \
+  --type type
+
+verify_result
+
+shutdown_elasticsearch_cluster
+
+mkdir -p $TEST_DATA_DIR
+
+# start downloading elasticsearch5
+echo "Downloading Elasticsearch5 from $ELASTICSEARCH5_URL"
+curl "$ELASTICSEARCH5_URL" > $TEST_DATA_DIR/elasticsearch5.tar.gz
+
+tar xzf $TEST_DATA_DIR/elasticsearch5.tar.gz -C $TEST_DATA_DIR/
+ELASTICSEARCH5_DIR=$TEST_DATA_DIR/elasticsearch-5.1.2
+
+# start elasticsearch cluster, different from elasticsearch1 since using 
-daemon here will hang the shell.
+nohup $ELASTICSEARCH5_DIR/bin/elasticsearch &
+
+verify_elasticsearch_process_exist
+
+start_cluster
+
+TEST_ES5_JAR=$TEST_DATA_DIR/../../flink-elasticsearch5-test/target/Elasticsearch5SinkExample.jar
+
+# run the Flink job
+$FLINK_DIR/bin/flink run -p 1 $TEST_ES5_JAR \
+  --index index \
+  --type type
+
+verify_result
+
+rm -rf $FLINK_DIR/log/* 2> /dev/null
+
+trap shutdown_elasticsearch_cluster INT
+trap shutdown_elasticsearch_cluster EXIT

Reply via email to