Repository: flink
Updated Branches:
  refs/heads/master b452c8bbb -> b5caaef82


http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
 
b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
index bc9bedc..93ac6c8 100644
--- 
a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
+++ 
b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
@@ -1,13 +1,12 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,217 +16,53 @@
  */
 package org.apache.flink.streaming.connectors.elasticsearch2;
 
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.elasticsearch.action.get.GetRequest;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.node.NodeBuilder;
-import org.junit.Assert;
-import org.junit.ClassRule;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 
-import java.io.File;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class ElasticsearchSinkITCase extends StreamingMultipleProgramsTestBase 
{
-
-       private static final int NUM_ELEMENTS = 20;
-
-       @ClassRule
-       public static TemporaryFolder tempFolder = new TemporaryFolder();
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
 
        @Test
        public void testTransportClient() throws Exception {
-
-               File dataDir = tempFolder.newFolder();
-
-               Node node = NodeBuilder.nodeBuilder()
-                               .settings(Settings.settingsBuilder()
-                                               .put("path.home", 
dataDir.getParent())
-                                               .put("http.enabled", false)
-                                               .put("path.data", 
dataDir.getAbsolutePath()))
-                               // set a custom cluster name to verify that 
user config works correctly
-                               .clusterName("my-transport-client-cluster")
-                               .node();
-
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-               DataStreamSource<Tuple2<Integer, String>> source = 
env.addSource(new TestSourceFunction());
-
-               Map<String, String> config = new HashMap<>();
-               // This instructs the sink to emit after every element, 
otherwise they would be buffered
-               config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, 
"1");
-               config.put("cluster.name", "my-transport-client-cluster");
-
-               // Can't use {@link TransportAddress} as its not Serializable 
in Elasticsearch 2.x
-               List<InetSocketAddress> transports = new ArrayList<>();
-               transports.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-
-               source.addSink(new ElasticsearchSink<>(config, transports, new 
TestElasticsearchSinkFunction()));
-
-               env.execute("Elasticsearch TransportClient Test");
-
-               // verify the results
-               Client client = node.client();
-               for (int i = 0; i < NUM_ELEMENTS; i++) {
-                       GetResponse response = client.get(new 
GetRequest("my-index",
-                                       "my-type", 
Integer.toString(i))).actionGet();
-                       Assert.assertEquals("message #" + i, 
response.getSource().get("data"));
-               }
-
-               node.close();
+               runTransportClientTest();
        }
 
- @Test(expected = IllegalArgumentException.class)
- public void testNullTransportClient() throws Exception {
-
-       File dataDir = tempFolder.newFolder();
-
-       Node node = NodeBuilder.nodeBuilder()
-               .settings(Settings.settingsBuilder()
-                       .put("path.home", dataDir.getParent())
-                       .put("http.enabled", false)
-                       .put("path.data", dataDir.getAbsolutePath()))
-               // set a custom cluster name to verify that user config works 
correctly
-               .clusterName("my-transport-client-cluster")
-               .node();
-
-       final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-       DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new 
TestSourceFunction());
-
-       Map<String, String> config = new HashMap<>();
-       // This instructs the sink to emit after every element, otherwise they 
would be buffered
-       config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-       config.put("cluster.name", "my-transport-client-cluster");
-
-       source.addSink(new ElasticsearchSink<>(config, null, new 
TestElasticsearchSinkFunction()));
-
-       env.execute("Elasticsearch TransportClient Test");
-
-       // verify the results
-       Client client = node.client();
-       for (int i = 0; i < NUM_ELEMENTS; i++) {
-        GetResponse response = client.get(new GetRequest("my-index",
-                "my-type", Integer.toString(i))).actionGet();
-        Assert.assertEquals("message #" + i, response.getSource().get("data"));
+       @Test
+       public void testNullTransportClient() throws Exception {
+               runNullTransportClientTest();
        }
 
-       node.close();
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testEmptyTransportClient() throws Exception {
-
-       File dataDir = tempFolder.newFolder();
-
-       Node node = NodeBuilder.nodeBuilder()
-               .settings(Settings.settingsBuilder()
-                       .put("path.home", dataDir.getParent())
-                       .put("http.enabled", false)
-                       .put("path.data", dataDir.getAbsolutePath()))
-               // set a custom cluster name to verify that user config works 
correctly
-               .clusterName("my-transport-client-cluster")
-               .node();
-
-       final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-       DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new 
TestSourceFunction());
-
-       Map<String, String> config = new HashMap<>();
-       // This instructs the sink to emit after every element, otherwise they 
would be buffered
-       config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-       config.put("cluster.name", "my-transport-client-cluster");
-
-       source.addSink(new ElasticsearchSink<>(config, new 
ArrayList<InetSocketAddress>(), new TestElasticsearchSinkFunction()));
-
-       env.execute("Elasticsearch TransportClient Test");
-
-       // verify the results
-       Client client = node.client();
-       for (int i = 0; i < NUM_ELEMENTS; i++) {
-        GetResponse response = client.get(new GetRequest("my-index",
-                "my-type", Integer.toString(i))).actionGet();
-        Assert.assertEquals("message #" + i, response.getSource().get("data"));
+       @Test
+       public void testEmptyTransportClient() throws Exception {
+               runEmptyTransportClientTest();
        }
 
-       node.close();
- }
-
-       @Test(expected = JobExecutionException.class)
+       @Test
        public void testTransportClientFails() throws Exception{
-               // this checks whether the TransportClient fails early when 
there is no cluster to
-               // connect to. There isn't a similar test for the Node Client 
version since that
-               // one will block and wait for a cluster to come online
-
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-               DataStreamSource<Tuple2<Integer, String>> source = 
env.addSource(new TestSourceFunction());
-
-               Map<String, String> config = new HashMap<>();
-               // This instructs the sink to emit after every element, 
otherwise they would be buffered
-               config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, 
"1");
-               config.put("cluster.name", "my-node-client-cluster");
-
-               List<InetSocketAddress> transports = new ArrayList<>();
-               transports.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-
-               source.addSink(new ElasticsearchSink<>(config, transports, new 
TestElasticsearchSinkFunction()));
-
-               env.execute("Elasticsearch Node Client Test");
+               runTransportClientFailsTest();
        }
 
-       private static class TestSourceFunction implements 
SourceFunction<Tuple2<Integer, String>> {
-               private static final long serialVersionUID = 1L;
-
-               private volatile boolean running = true;
-
-               @Override
-               public void run(SourceContext<Tuple2<Integer, String>> ctx) 
throws Exception {
-                       for (int i = 0; i < NUM_ELEMENTS && running; i++) {
-                               ctx.collect(Tuple2.of(i, "message #" + i));
-                       }
-               }
-
-               @Override
-               public void cancel() {
-                       running = false;
-               }
+       @Override
+       protected <T> ElasticsearchSinkBase<T> 
createElasticsearchSink(Map<String, String> userConfig,
+                                                                               
                                                List<InetSocketAddress> 
transportAddresses,
+                                                                               
                                                ElasticsearchSinkFunction<T> 
elasticsearchSinkFunction) {
+               return new ElasticsearchSink<>(userConfig, transportAddresses, 
elasticsearchSinkFunction);
        }
 
-       private static class TestElasticsearchSinkFunction implements 
ElasticsearchSinkFunction<Tuple2<Integer, String>> {
-               private static final long serialVersionUID = 1L;
+       @Override
+       protected <T> ElasticsearchSinkBase<T> 
createElasticsearchSinkForEmbeddedNode(
+               Map<String, String> userConfig, ElasticsearchSinkFunction<T> 
elasticsearchSinkFunction) throws Exception {
 
-               public IndexRequest createIndexRequest(Tuple2<Integer, String> 
element) {
-                       Map<String, Object> json = new HashMap<>();
-                       json.put("data", element.f1);
-
-                       return Requests.indexRequest()
-                                       .index("my-index")
-                                       .type("my-type")
-                                       .id(element.f0.toString())
-                                       .source(json);
-               }
+               List<InetSocketAddress> transports = new ArrayList<>();
+               transports.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
 
-               @Override
-               public void process(Tuple2<Integer, String> element, 
RuntimeContext ctx, RequestIndexer indexer) {
-                       indexer.add(createIndexRequest(element));
-               }
+               return new ElasticsearchSink<>(userConfig, transports, 
elasticsearchSinkFunction);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchExample.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchExample.java
 
b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchExample.java
deleted file mode 100644
index 05760e8..0000000
--- 
a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchExample.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.elasticsearch2.examples;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink;
-import 
org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSinkFunction;
-import org.apache.flink.streaming.connectors.elasticsearch2.RequestIndexer;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Requests;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This example shows how to use the Elasticsearch Sink. Before running it you 
must ensure that
- * you have a cluster named "elasticsearch" running or change the name of 
cluster in the config map.
- */
-public class ElasticsearchExample {
-
-       public static void main(String[] args) throws Exception {
-
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-               SingleOutputStreamOperator<String> source =
-                               env.generateSequence(0, 20).map(new 
MapFunction<Long, String>() {
-                                       /**
-                                        * The mapping method. Takes an element 
from the input data set and transforms
-                                        * it into exactly one element.
-                                        *
-                                        * @param value The input value.
-                                        * @return The transformed value
-                                        * @throws Exception This method may 
throw exceptions. Throwing an exception will cause the operation
-                                        *                   to fail and may 
trigger recovery.
-                                        */
-                                       @Override
-                                       public String map(Long value) throws 
Exception {
-                                               return "message #" + value;
-                                       }
-                               });
-
-               Map<String, String> config = new HashMap<>();
-               // This instructs the sink to emit after every element, 
otherwise they would be buffered
-               config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, 
"1");
-
-               List<InetSocketAddress> transports = new ArrayList<>();
-               transports.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-
-               source.addSink(new ElasticsearchSink<>(config, transports, new 
ElasticsearchSinkFunction<String>(){
-                       @Override
-                       public void process(String element, RuntimeContext ctx, 
RequestIndexer indexer) {
-                               indexer.add(createIndexRequest(element));
-                       }
-               }));
-
-               env.execute("Elasticsearch Example");
-       }
-
-       private static IndexRequest createIndexRequest(String element) {
-               Map<String, Object> json = new HashMap<>();
-               json.put("data", element);
-
-               return Requests.indexRequest()
-                               .index("my-index")
-                               .type("my-type")
-                               .id(element)
-                               .source(json);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchSinkExample.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchSinkExample.java
 
b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchSinkExample.java
new file mode 100644
index 0000000..8c50847
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchSinkExample.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch2.examples;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This example shows how to use the Elasticsearch Sink. Before running it you 
must ensure that
+ * you have a cluster named "elasticsearch" running or change the name of 
cluster in the config map.
+ */
+public class ElasticsearchSinkExample {
+
+       public static void main(String[] args) throws Exception {
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStream<String> source = env.generateSequence(0, 20).map(new 
MapFunction<Long, String>() {
+                       @Override
+                       public String map(Long value) throws Exception {
+                               return "message #" + value;
+                       }
+               });
+
+               Map<String, String> userConfig = new HashMap<>();
+               userConfig.put("cluster.name", "elasticsearch");
+               // This instructs the sink to emit after every element, 
otherwise they would be buffered
+               
userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+               List<InetSocketAddress> transports = new ArrayList<>();
+               transports.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+
+               source.addSink(new ElasticsearchSink<>(userConfig, transports, 
new 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction<String>(){
+                       @Override
+                       public void process(String element, RuntimeContext ctx, 
org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer indexer) {
+                               indexer.add(createIndexRequest(element));
+                       }
+               }));
+
+               env.execute("Elasticsearch Sink Example");
+       }
+
+       private static IndexRequest createIndexRequest(String element) {
+               Map<String, Object> json = new HashMap<>();
+               json.put("data", element);
+
+               return Requests.indexRequest()
+                       .index("my-index")
+                       .type("my-type")
+                       .id(element)
+                       .source(json);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j-test.properties
 
b/flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j-test.properties
index dc20726..2055184 100644
--- 
a/flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j-test.properties
+++ 
b/flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j-test.properties
@@ -16,12 +16,12 @@
 # limitations under the License.
 
################################################################################
 
-log4j.rootLogger=OFF, testlogger
+log4j.rootLogger=INFO, testlogger
 
 log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.target=System.err
 log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
 log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
 
 # suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch5/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch5/pom.xml 
b/flink-connectors/flink-connector-elasticsearch5/pom.xml
index 8fc5c8b..a0bd328 100644
--- a/flink-connectors/flink-connector-elasticsearch5/pom.xml
+++ b/flink-connectors/flink-connector-elasticsearch5/pom.xml
@@ -26,7 +26,7 @@ under the License.
        <parent>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connectors</artifactId>
-               <version>1.2-SNAPSHOT</version>
+               <version>1.3-SNAPSHOT</version>
                <relativePath>..</relativePath>
        </parent>
 
@@ -37,7 +37,7 @@ under the License.
 
        <!-- Allow users to pass custom connector versions -->
        <properties>
-               <elasticsearch.version>5.0.0</elasticsearch.version>
+               <elasticsearch.version>5.1.2</elasticsearch.version>
        </properties>
 
        <dependencies>
@@ -52,27 +52,38 @@ under the License.
                </dependency>
 
                <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-connector-elasticsearch-base_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <exclusions>
+                               <!-- Elasticsearch Java Client has been moved 
to a different module in 5.x -->
+                               <exclusion>
+                                       <groupId>org.elasticsearch</groupId>
+                                       <artifactId>elasticsearch</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+               <!-- Dependency for Elasticsearch 5.x Java Client -->
+               <dependency>
                        <groupId>org.elasticsearch.client</groupId>
                        <artifactId>transport</artifactId>
                        <version>${elasticsearch.version}</version>
                </dependency>
 
+               <!--
+                       Elasticsearch 5.x uses Log4j2 and no longer detects 
logging implementations, making
+                       Log4j2 a strict dependency. The following is added so 
that the Log4j2 API in
+                       Elasticsearch 5.x is routed to SLF4J. This way, user 
projects can remain flexible
+                       in the logging implementation preferred.
+               -->
+
                <dependency>
                        <groupId>org.apache.logging.log4j</groupId>
-                       <artifactId>log4j-api</artifactId>
-                       <version>2.7</version>
-               </dependency>
-               <dependency>
-                       <groupId>org.apache.logging.log4j</groupId>
-                       <artifactId>log4j-core</artifactId>
+                       <artifactId>log4j-to-slf4j</artifactId>
                        <version>2.7</version>
                </dependency>
 
-               <dependency>
-                       <groupId>com.fasterxml.jackson.core</groupId>
-                       <artifactId>jackson-core</artifactId>
-               </dependency>
-
                <!-- test dependencies -->
 
                <dependency>
@@ -81,6 +92,7 @@ under the License.
                        <version>${project.version}</version>
                        <scope>test</scope>
                </dependency>
+
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-streaming-java_2.10</artifactId>
@@ -88,6 +100,63 @@ under the License.
                        <scope>test</scope>
                        <type>test-jar</type>
                </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-connector-elasticsearch-base_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>org.elasticsearch</groupId>
+                                       <artifactId>elasticsearch</artifactId>
+                               </exclusion>
+                       </exclusions>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <!--
+                       Including Log4j2 dependencies for tests is required for 
the
+                       embedded Elasticsearch nodes used in tests to run 
correctly.
+               -->
+
+               <dependency>
+                       <groupId>org.apache.logging.log4j</groupId>
+                       <artifactId>log4j-api</artifactId>
+                       <version>2.7</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.logging.log4j</groupId>
+                       <artifactId>log4j-core</artifactId>
+                       <version>2.7</version>
+                       <scope>test</scope>
+               </dependency>
+
        </dependencies>
 
+       <build>
+               <plugins>
+                       <!--
+                               For the tests, we need to exclude the Log4j2 to 
slf4j adapter dependency
+                               and let Elasticsearch directly use Log4j2, 
otherwise the embedded Elasticsearch node
+                               used in tests will fail to work.
+
+                               In other words, the connector jar is routing 
Elasticsearch 5.x's Log4j2 API's to SLF4J,
+                               but for the test builds, we still stick to 
directly using Log4j2.
+                       -->
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-surefire-plugin</artifactId>
+                               <version>2.12.2</version>
+                               <configuration>
+                                       <classpathDependencyExcludes>
+                                               
<classpathDependencyExclude>org.apache.logging.log4j:log4j-to-slf4j</classpathDependencyExclude>
+                                       </classpathDependencyExcludes>
+                               </configuration>
+                       </plugin>
+               </plugins>
+       </build>
+
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/BulkProcessorIndexer.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/BulkProcessorIndexer.java
 
b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/BulkProcessorIndexer.java
deleted file mode 100644
index f7ca499..0000000
--- 
a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/BulkProcessorIndexer.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.elasticsearch5;
-
-import org.elasticsearch.action.ActionRequest;
-import org.elasticsearch.action.bulk.BulkProcessor;
-
-public class BulkProcessorIndexer implements RequestIndexer {
-       private final BulkProcessor bulkProcessor;
-
-       public BulkProcessorIndexer(BulkProcessor bulkProcessor) {
-               this.bulkProcessor = bulkProcessor;
-       }
-
-       @Override
-       public void add(ActionRequest... actionRequests) {
-               for (ActionRequest actionRequest : actionRequests) {
-                       this.bulkProcessor.add(actionRequest);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
 
b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
new file mode 100644
index 0000000..1389e7d
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch5;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
+import 
org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUtils;
+import org.apache.flink.util.Preconditions;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.network.NetworkModule;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.transport.Netty3Plugin;
+import org.elasticsearch.transport.client.PreBuiltTransportClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 5.x.
+ */
+public class Elasticsearch5ApiCallBridge implements ElasticsearchApiCallBridge 
{
+
+       private static final long serialVersionUID = -5222683870097809633L;
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(Elasticsearch5ApiCallBridge.class);
+
+       /**
+        * User-provided transport addresses.
+        *
+        * We are using {@link InetSocketAddress} because {@link 
TransportAddress} is not serializable in Elasticsearch 5.x.
+        */
+       private final List<InetSocketAddress> transportAddresses;
+
+       Elasticsearch5ApiCallBridge(List<InetSocketAddress> transportAddresses) 
{
+               Preconditions.checkArgument(transportAddresses != null && 
!transportAddresses.isEmpty());
+               this.transportAddresses = transportAddresses;
+       }
+
+       @Override
+       public Client createClient(Map<String, String> clientConfig) {
+               Settings settings = Settings.builder().put(clientConfig)
+                       .put(NetworkModule.HTTP_TYPE_KEY, 
Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME)
+                       .put(NetworkModule.TRANSPORT_TYPE_KEY, 
Netty3Plugin.NETTY_TRANSPORT_NAME)
+                       .build();
+
+               TransportClient transportClient = new 
PreBuiltTransportClient(settings);
+               for (TransportAddress transport : 
ElasticsearchUtils.convertInetSocketAddresses(transportAddresses)) {
+                       transportClient.addTransportAddress(transport);
+               }
+
+               // verify that we actually are connected to a cluster
+               if (transportClient.connectedNodes().isEmpty()) {
+                       throw new RuntimeException("Elasticsearch client is not 
connected to any Elasticsearch nodes!");
+               }
+
+               if (LOG.isInfoEnabled()) {
+                       LOG.info("Created Elasticsearch TransportClient with 
connected nodes {}", transportClient.connectedNodes());
+               }
+
+               return transportClient;
+       }
+
+       @Override
+       public Throwable 
extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
+               if (!bulkItemResponse.isFailed()) {
+                       return null;
+               } else {
+                       return bulkItemResponse.getFailure().getCause();
+               }
+       }
+
+       @Override
+       public void cleanup() {
+               // nothing to cleanup
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
 
b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
index 29c69c4..9107d4e 100644
--- 
a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
+++ 
b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
@@ -16,244 +16,61 @@
  */
 package org.apache.flink.streaming.connectors.elasticsearch5;
 
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.util.Preconditions;
-import org.elasticsearch.action.bulk.BulkItemResponse;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Client;
 import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.network.NetworkModule;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.elasticsearch.common.transport.TransportAddress;
-import org.elasticsearch.common.unit.ByteSizeUnit;
-import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.transport.Netty3Plugin;
-import org.elasticsearch.transport.client.PreBuiltTransportClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * Sink that emits its input elements in bulk to an Elasticsearch cluster.
- * <p>
+ * Elasticsearch 5.x sink that requests multiple {@link ActionRequest 
ActionRequests}
+ * against a cluster for each incoming element.
+ *
  * <p>
- * The first {@link Map} passed to the constructor is forwarded to 
Elasticsearch when creating
- * {@link TransportClient}. The config keys can be found in the Elasticsearch
- * documentation. An important setting is {@code cluster.name}, this should be 
set to the name
- * of the cluster that the sink should emit to.
+ * The sink internally uses a {@link TransportClient} to communicate with an 
Elasticsearch cluster.
+ * The sink will fail if no cluster can be connected to using the provided 
transport addresses passed to the constructor.
+ *
  * <p>
- * <b>Attention: </b> When using the {@code TransportClient} the sink will 
fail if no cluster
- * can be connected to.
+ * The {@link Map} passed to the constructor is used to create the {@code 
TransportClient}. The config keys can be found
+ * in the <a href="https://www.elastic.io";>Elasticsearch documentation</a>. An 
important setting is {@code cluster.name},
+ * which should be set to the name of the cluster that the sink should emit to.
+ *
  * <p>
- * The second {@link Map} is used to configure a {@link BulkProcessor} to send 
{@link IndexRequest IndexRequests}.
+ * Internally, the sink will use a {@link BulkProcessor} to send {@link 
ActionRequest ActionRequests}.
  * This will buffer elements before sending a request to the cluster. The 
behaviour of the
  * {@code BulkProcessor} can be configured using these config keys:
  * <ul>
- * <li> {@code bulk.flush.max.actions}: Maximum amount of elements to buffer
- * <li> {@code bulk.flush.max.size.mb}: Maximum amount of data (in megabytes) 
to buffer
- * <li> {@code bulk.flush.interval.ms}: Interval at which to flush data 
regardless of the other two
- * settings in milliseconds
+ *   <li> {@code bulk.flush.max.actions}: Maximum amount of elements to buffer
+ *   <li> {@code bulk.flush.max.size.mb}: Maximum amount of data (in 
megabytes) to buffer
+ *   <li> {@code bulk.flush.interval.ms}: Interval at which to flush data 
regardless of the other two
+ *   settings in milliseconds
  * </ul>
+ *
  * <p>
- * <p>
- * You also have to provide an {@link RequestIndexer}. This is used to create 
an
- * {@link IndexRequest} from an element that needs to be added to 
Elasticsearch. See
- * {@link RequestIndexer} for an example.
+ * You also have to provide an {@link ElasticsearchSinkFunction}. This is used 
to create multiple
+ * {@link ActionRequest ActionRequests} for each incoming element. See the 
class level documentation of
+ * {@link ElasticsearchSinkFunction} for an example.
  *
- * @param <T> Type of the elements emitted by this sink
+ * @param <T> Type of the elements handled by this sink
  */
-public class ElasticsearchSink<T> extends RichSinkFunction<T> {
-
-       public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = 
"bulk.flush.max.actions";
-       public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = 
"bulk.flush.max.size.mb";
-       public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = 
"bulk.flush.interval.ms";
+public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
 
        private static final long serialVersionUID = 1L;
 
-       private static final Logger LOG = 
LoggerFactory.getLogger(ElasticsearchSink.class);
-
-       /**
-        * The user specified config map that we forward to Elasticsearch when 
we create the Client.
-        */
-       private final Map<String, String> esConfig;
-
-       /**
-        * The user specified config map that we use to configure BulkProcessor.
-        */
-       private final Map<String, String> sinkConfig;
-
-       /**
-        * The list of nodes that the TransportClient should connect to. This 
is null if we are using
-        * an embedded Node to get a Client.
-        */
-       private final List<InetSocketAddress> transportAddresses;
-
-       /**
-        * The builder that is used to construct an {@link IndexRequest} from 
the incoming element.
-        */
-       private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction;
-
-       /**
-        * The Client that was either retrieved from a Node or is a 
TransportClient.
-        */
-       private transient Client client;
-
-       /**
-        * Bulk processor that was created using the client
-        */
-       private transient BulkProcessor bulkProcessor;
-
-       /**
-        * Bulk {@link org.elasticsearch.action.ActionRequest} indexer
-        */
-       private transient RequestIndexer requestIndexer;
-
-       /**
-        * This is set from inside the BulkProcessor listener if there where 
failures in processing.
-        */
-       private final AtomicBoolean hasFailure = new AtomicBoolean(false);
-
-       /**
-        * This is set from inside the BulkProcessor listener if a Throwable 
was thrown during processing.
-        */
-       private final AtomicReference<Throwable> failureThrowable = new 
AtomicReference<>();
-
        /**
-        * Creates a new ElasticsearchSink that connects to the cluster using a 
TransportClient.
+        * Creates a new {@code ElasticsearchSink} that connects to the cluster 
using a {@link TransportClient}.
         *
-        * @param esConfig                  The map of user settings that are 
passed when constructing the TransportClient
-        * @param sinkConfig                The map of user settings that are 
passed when constructing the BulkProcessor
-        * @param transportAddresses        The Elasticsearch Nodes to which to 
connect using a {@code TransportClient}
-        * @param elasticsearchSinkFunction This is used to generate the 
ActionRequest from the incoming element
+        * @param userConfig The map of user settings that are used when 
constructing the {@link TransportClient}
+        * @param transportAddresses The addresses of Elasticsearch nodes to 
which to connect using a {@link TransportClient}
+        * @param elasticsearchSinkFunction This is used to generate multiple 
{@link ActionRequest} from the incoming element
         */
-       public ElasticsearchSink(Map<String, String> esConfig, Map<String, 
String> sinkConfig, List<InetSocketAddress> transportAddresses, 
ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
-               this.esConfig = esConfig;
-               this.sinkConfig = sinkConfig;
-               this.elasticsearchSinkFunction = elasticsearchSinkFunction;
-               Preconditions.checkArgument(transportAddresses != null && 
transportAddresses.size() > 0);
-               this.transportAddresses = transportAddresses;
+       public ElasticsearchSink(Map<String, String> userConfig,
+                                                       List<InetSocketAddress> 
transportAddresses,
+                                                       
ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+               super(new Elasticsearch5ApiCallBridge(transportAddresses), 
userConfig, elasticsearchSinkFunction);
        }
-
-       /**
-        * Initializes the connection to Elasticsearch by creating a
-        * {@link org.elasticsearch.client.transport.TransportClient}.
-        */
-       @Override
-       public void open(Configuration configuration) {
-               List<TransportAddress> transportNodes;
-               transportNodes = new ArrayList<>(transportAddresses.size());
-               for (InetSocketAddress address : transportAddresses) {
-                       transportNodes.add(new 
InetSocketTransportAddress(address));
-               }
-
-               Settings settings = Settings.builder().put(esConfig)
-                       .put(NetworkModule.HTTP_TYPE_KEY, 
Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME)
-                       .put(NetworkModule.TRANSPORT_TYPE_KEY, 
Netty3Plugin.NETTY_TRANSPORT_NAME)
-                       .build();
-
-               TransportClient transportClient = new 
PreBuiltTransportClient(settings);
-               for (TransportAddress transport : transportNodes) {
-                       transportClient.addTransportAddress(transport);
-               }
-
-               // verify that we actually are connected to a cluster
-               if (transportClient.connectedNodes().isEmpty()) {
-                       throw new RuntimeException("Client is not connected to 
any Elasticsearch nodes!");
-               }
-
-               client = transportClient;
-
-               if (LOG.isInfoEnabled()) {
-                       LOG.info("Created Elasticsearch TransportClient {}", 
client);
-               }
-
-               BulkProcessor.Builder bulkProcessorBuilder = 
BulkProcessor.builder(client, new BulkProcessor.Listener() {
-                       @Override
-                       public void beforeBulk(long executionId, BulkRequest 
request) {
-
-                       }
-
-                       @Override
-                       public void afterBulk(long executionId, BulkRequest 
request, BulkResponse response) {
-                               if (response.hasFailures()) {
-                                       for (BulkItemResponse itemResp : 
response.getItems()) {
-                                               if (itemResp.isFailed()) {
-                                                       LOG.error("Failed to 
index document in Elasticsearch: " + itemResp.getFailureMessage());
-                                                       
failureThrowable.compareAndSet(null, new 
RuntimeException(itemResp.getFailureMessage()));
-                                               }
-                                       }
-                                       hasFailure.set(true);
-                               }
-                       }
-
-                       @Override
-                       public void afterBulk(long executionId, BulkRequest 
request, Throwable failure) {
-                               LOG.error(failure.getMessage());
-                               failureThrowable.compareAndSet(null, failure);
-                               hasFailure.set(true);
-                       }
-               });
-
-               // This makes flush() blocking
-               bulkProcessorBuilder.setConcurrentRequests(0);
-
-               ParameterTool params = ParameterTool.fromMap(sinkConfig);
-
-               if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
-                       
bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS));
-               }
-
-               if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
-                       bulkProcessorBuilder.setBulkSize(new 
ByteSizeValue(params.getInt(
-                               CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), 
ByteSizeUnit.MB));
-               }
-
-               if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
-                       
bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)));
-               }
-
-               bulkProcessor = bulkProcessorBuilder.build();
-               requestIndexer = new BulkProcessorIndexer(bulkProcessor);
-       }
-
-       @Override
-       public void invoke(T element) {
-               elasticsearchSinkFunction.process(element, getRuntimeContext(), 
requestIndexer);
-       }
-
-       @Override
-       public void close() {
-               if (bulkProcessor != null) {
-                       bulkProcessor.close();
-                       bulkProcessor = null;
-               }
-
-               if (client != null) {
-                       client.close();
-               }
-
-               if (hasFailure.get()) {
-                       Throwable cause = failureThrowable.get();
-                       if (cause != null) {
-                               throw new RuntimeException("An error occurred 
in ElasticsearchSink.", cause);
-                       } else {
-                               throw new RuntimeException("An error occurred 
in ElasticsearchSink.");
-                       }
-               }
-
-       }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkFunction.java
 
b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkFunction.java
deleted file mode 100644
index 752a83e..0000000
--- 
a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkFunction.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.elasticsearch5;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.RuntimeContext;
-
-import java.io.Serializable;
-
-/**
- * Method that creates an {@link org.elasticsearch.action.ActionRequest} from 
an element in a Stream.
- *
- * <p>
- * This is used by {@link ElasticsearchSink} to prepare elements for sending 
them to Elasticsearch.
- *
- * <p>
- * Example:
- *
- * <pre>{@code
- *                                     private static class 
TestElasticSearchSinkFunction implements
- *                                             
ElasticsearchSinkFunction<Tuple2<Integer, String>> {
- *
- *                                     public IndexRequest 
createIndexRequest(Tuple2<Integer, String> element) {
- *                                             Map<String, Object> json = new 
HashMap<>();
- *                                             json.put("data", element.f1);
- *
- *                                             return Requests.indexRequest()
- *                                                     .index("my-index")
- *                                                     .type("my-type")
- *                                                     
.id(element.f0.toString())
- *                                                     .source(json);
- *                                             }
- *
- *                             public void process(Tuple2<Integer, String> 
element, RuntimeContext ctx, RequestIndexer indexer) {
- *                                     
indexer.add(createIndexRequest(element));
- *                             }
- *             }
- *
- * }</pre>
- *
- * @param <T> The type of the element handled by this {@code 
ElasticsearchSinkFunction}
- */
-public interface ElasticsearchSinkFunction<T> extends Serializable, Function {
-       void process(T element, RuntimeContext ctx, RequestIndexer indexer);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/RequestIndexer.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/RequestIndexer.java
 
b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/RequestIndexer.java
deleted file mode 100644
index 170df31..0000000
--- 
a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/RequestIndexer.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.elasticsearch5;
-
-import org.elasticsearch.action.ActionRequest;
-
-import java.io.Serializable;
-
-public interface RequestIndexer extends Serializable {
-       void add(ActionRequest... actionRequests);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
 
b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
new file mode 100644
index 0000000..f3d8897
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSinkITCase;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.network.NetworkModule;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.node.internal.InternalSettingsPreparer;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.transport.Netty3Plugin;
+
+import java.io.File;
+import java.util.Collections;
+
+/**
+ * Implementation of {@link EmbeddedElasticsearchNodeEnvironment} for 
Elasticsearch 5.x.
+ * Will be dynamically loaded in {@link ElasticsearchSinkITCase} for 
integration tests.
+ */
+public class EmbeddedElasticsearchNodeEnvironmentImpl implements 
EmbeddedElasticsearchNodeEnvironment {
+
+       private Node node;
+
+       @Override
+       public void start(File tmpDataFolder, String clusterName) throws 
Exception {
+               if (node == null) {
+                       Settings settings = Settings.builder()
+                               .put("cluster.name", clusterName)
+                               .put("http.enabled", false)
+                               .put("path.home", tmpDataFolder.getParent())
+                               .put("path.data", 
tmpDataFolder.getAbsolutePath())
+                               .put(NetworkModule.HTTP_TYPE_KEY, 
Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME)
+                               .put(NetworkModule.TRANSPORT_TYPE_KEY, 
Netty3Plugin.NETTY_TRANSPORT_NAME)
+                               .build();
+
+                       node = new PluginNode(settings);
+                       node.start();
+               }
+       }
+
+       @Override
+       public void close() throws Exception {
+               if (node != null && !node.isClosed()) {
+                       node.close();
+                       node = null;
+               }
+       }
+
+       @Override
+       public Client getClient() {
+               if (node != null && !node.isClosed()) {
+                       return node.client();
+               } else {
+                       return null;
+               }
+       }
+
+       private static class PluginNode extends Node {
+               public PluginNode(Settings settings) {
+                       
super(InternalSettingsPreparer.prepareEnvironment(settings, null), 
Collections.<Class<? extends Plugin>>singletonList(Netty3Plugin.class));
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
 
b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
index b4a370b..3ebda52 100644
--- 
a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
+++ 
b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,184 +17,54 @@
  */
 package org.apache.flink.streaming.connectors.elasticsearch5;
 
-import com.google.common.collect.ImmutableMap;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.common.network.NetworkModule;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.node.internal.InternalSettingsPreparer;
-import org.elasticsearch.plugins.Plugin;
-import org.elasticsearch.transport.Netty3Plugin;
-import org.junit.ClassRule;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 
-import java.io.File;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-public class ElasticsearchSinkITCase extends StreamingMultipleProgramsTestBase 
{
-
-       private static final int NUM_ELEMENTS = 20;
-
-       @ClassRule
-       public static TemporaryFolder tempFolder = new TemporaryFolder();
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
 
        @Test
        public void testTransportClient() throws Exception {
-
-               File dataDir = tempFolder.newFolder();
-
-               Settings settings = Settings.builder()
-                       .put("cluster.name", "my-transport-client-cluster")
-                       .put("http.enabled", false)
-                       .put("path.home", dataDir.getParent())
-                       .put("path.data", dataDir.getAbsolutePath())
-                       .put(NetworkModule.HTTP_TYPE_KEY, 
Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME)
-                       .put(NetworkModule.TRANSPORT_TYPE_KEY, 
Netty3Plugin.NETTY_TRANSPORT_NAME)
-                       .build();
-
-               Node node = new PluginNode(settings);
-               node.start();
-
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-               DataStreamSource<Tuple2<Integer, String>> source = 
env.addSource(new TestSourceFunction());
-
-               Map<String, String> esConfig = ImmutableMap.of("cluster.name", 
"my-transport-client-cluster");
-               Map<String, String> sinkConfig = 
ImmutableMap.of(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-
-               List<InetSocketAddress> transports = new ArrayList<>();
-               transports.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-
-               source.addSink(new ElasticsearchSink<>(esConfig, sinkConfig, 
transports, new TestElasticsearchSinkFunction()));
-
-               env.execute("Elasticsearch TransportClient Test");
-
-               // verify the results
-               Client client = node.client();
-               for (int i = 0; i < NUM_ELEMENTS; i++) {
-                       GetResponse response = client.prepareGet("my-index", 
"my-type", Integer.toString(i)).get();
-                       assertEquals("message #" + i, 
response.getSource().get("data"));
-               }
-
-               node.close();
+               runTransportClientTest();
        }
 
-       @Test(expected = IllegalArgumentException.class)
+       @Test
        public void testNullTransportClient() throws Exception {
-
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-               DataStreamSource<Tuple2<Integer, String>> source = 
env.addSource(new TestSourceFunction());
-
-               Map<String, String> esConfig = ImmutableMap.of("cluster.name", 
"my-transport-client-cluster");
-               Map<String, String> sinkConfig = 
ImmutableMap.of(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-
-               source.addSink(new ElasticsearchSink<>(esConfig, sinkConfig, 
null, new TestElasticsearchSinkFunction()));
-
-               fail();
+               runNullTransportClientTest();
        }
 
-       @Test(expected = IllegalArgumentException.class)
+       @Test
        public void testEmptyTransportClient() throws Exception {
-
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-               DataStreamSource<Tuple2<Integer, String>> source = 
env.addSource(new TestSourceFunction());
-
-               Map<String, String> esConfig = ImmutableMap.of("cluster.name", 
"my-transport-client-cluster");
-               Map<String, String> sinkConfig = 
ImmutableMap.of(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-
-               source.addSink(new ElasticsearchSink<>(esConfig, sinkConfig, 
new ArrayList<InetSocketAddress>(), new TestElasticsearchSinkFunction()));
-
-               env.execute("Elasticsearch TransportClient Test");
-
-               fail();
+               runEmptyTransportClientTest();
        }
 
-       @Test(expected = JobExecutionException.class)
+       @Test
        public void testTransportClientFails() throws Exception {
-               // this checks whether the TransportClient fails early when 
there is no cluster to
-               // connect to. There isn't a similar test for the Node Client 
version since that
-               // one will block and wait for a cluster to come online
-
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-               DataStreamSource<Tuple2<Integer, String>> source = 
env.addSource(new TestSourceFunction());
-
-               Map<String, String> esConfig = ImmutableMap.of("cluster.name", 
"my-transport-client-cluster");
-               Map<String, String> sinkConfig = 
ImmutableMap.of(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-
-               List<InetSocketAddress> transports = new ArrayList<>();
-               transports.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-
-               source.addSink(new ElasticsearchSink<>(esConfig, sinkConfig, 
transports, new TestElasticsearchSinkFunction()));
-
-               env.execute("Elasticsearch Node Client Test");
-
-               fail();
+               runTransportClientFailsTest();
        }
 
-       private static class TestSourceFunction implements 
SourceFunction<Tuple2<Integer, String>> {
-               private static final long serialVersionUID = 1L;
-
-               private volatile boolean running = true;
-
-               @Override
-               public void run(SourceContext<Tuple2<Integer, String>> ctx) 
throws Exception {
-                       for (int i = 0; i < NUM_ELEMENTS && running; i++) {
-                               ctx.collect(Tuple2.of(i, "message #" + i));
-                       }
-               }
-
-               @Override
-               public void cancel() {
-                       running = false;
-               }
+       @Override
+       protected <T> ElasticsearchSinkBase<T> 
createElasticsearchSink(Map<String, String> userConfig,
+                                                                               
                                                List<InetSocketAddress> 
transportAddresses,
+                                                                               
                                                ElasticsearchSinkFunction<T> 
elasticsearchSinkFunction) {
+               return new ElasticsearchSink<>(userConfig, transportAddresses, 
elasticsearchSinkFunction);
        }
 
-       private static class TestElasticsearchSinkFunction implements 
ElasticsearchSinkFunction<Tuple2<Integer, String>> {
-               private static final long serialVersionUID = 1L;
-
-               public IndexRequest createIndexRequest(Tuple2<Integer, String> 
element) {
-                       Map<String, Object> json = new HashMap<>();
-                       json.put("data", element.f1);
+       @Override
+       protected <T> ElasticsearchSinkBase<T> 
createElasticsearchSinkForEmbeddedNode(
+               Map<String, String> userConfig, ElasticsearchSinkFunction<T> 
elasticsearchSinkFunction) throws Exception {
 
-                       return Requests.indexRequest()
-                               .index("my-index")
-                               .type("my-type")
-                               .id(element.f0.toString())
-                               .source(json);
-               }
+               List<InetSocketAddress> transports = new ArrayList<>();
+               transports.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
 
-               @Override
-               public void process(Tuple2<Integer, String> element, 
RuntimeContext ctx, RequestIndexer indexer) {
-                       indexer.add(createIndexRequest(element));
-               }
+               return new ElasticsearchSink<>(userConfig, transports, 
elasticsearchSinkFunction);
        }
 
-       private static class PluginNode extends Node {
-               public PluginNode(Settings settings) {
-                       
super(InternalSettingsPreparer.prepareEnvironment(settings, null), 
Collections.<Class<? extends Plugin>>singletonList(Netty3Plugin.class));
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchExample.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchExample.java
 
b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchExample.java
deleted file mode 100644
index 47ce846..0000000
--- 
a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchExample.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.elasticsearch5.examples;
-
-import com.google.common.collect.ImmutableMap;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;
-import 
org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSinkFunction;
-import org.apache.flink.streaming.connectors.elasticsearch5.RequestIndexer;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Requests;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This example shows how to use the Elasticsearch Sink. Before running it you 
must ensure that
- * you have a cluster named "elasticsearch" running or change the name of 
cluster in the config map.
- */
-public class ElasticsearchExample {
-
-       public static void main(String[] args) throws Exception {
-
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-               SingleOutputStreamOperator<String> source =
-                       env.generateSequence(0, 20).map(new MapFunction<Long, 
String>() {
-                               @Override
-                               public String map(Long value) throws Exception {
-                                       return "message #" + value;
-                               }
-                       });
-
-               Map<String, String> esConfig = ImmutableMap.of("cluster.name", 
"elasticsearch");
-
-               // This instructs the sink to emit after every element, 
otherwise they would be buffered
-               Map<String, String> sinkConfig = 
ImmutableMap.of(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-
-               List<InetSocketAddress> transports = new ArrayList<>();
-               transports.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-
-               source.addSink(new ElasticsearchSink<>(esConfig, sinkConfig, 
transports, new ElasticsearchSinkFunction<String>() {
-                       @Override
-                       public void process(String element, RuntimeContext ctx, 
RequestIndexer indexer) {
-                               indexer.add(createIndexRequest(element));
-                       }
-               }));
-
-               env.execute("Elasticsearch Example");
-       }
-
-       private static IndexRequest createIndexRequest(String element) {
-               Map<String, Object> json = new HashMap<>();
-               json.put("data", element);
-
-               return Requests.indexRequest()
-                       .index("my-index")
-                       .type("my-type")
-                       .id(element)
-                       .source(json);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchSinkExample.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchSinkExample.java
 
b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchSinkExample.java
new file mode 100644
index 0000000..4135283
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchSinkExample.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch5.examples;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This example shows how to use the Elasticsearch Sink. Before running it you 
must ensure that
+ * you have a cluster named "elasticsearch" running or change the name of 
cluster in the config map.
+ */
+public class ElasticsearchSinkExample {
+
+       public static void main(String[] args) throws Exception {
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStream<String> source = env.generateSequence(0, 20).map(new 
MapFunction<Long, String>() {
+                       @Override
+                       public String map(Long value) throws Exception {
+                               return "message #" + value;
+                       }
+               });
+
+               Map<String, String> userConfig = new HashMap<>();
+               userConfig.put("cluster.name", "elasticsearch");
+               // This instructs the sink to emit after every element, 
otherwise they would be buffered
+               
userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+               List<InetSocketAddress> transports = new ArrayList<>();
+               transports.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+
+               source.addSink(new ElasticsearchSink<>(userConfig, transports, 
new ElasticsearchSinkFunction<String>() {
+                       @Override
+                       public void process(String element, RuntimeContext ctx, 
RequestIndexer indexer) {
+                               indexer.add(createIndexRequest(element));
+                       }
+               }));
+
+               env.execute("Elasticsearch Sink Example");
+       }
+
+       private static IndexRequest createIndexRequest(String element) {
+               Map<String, Object> json = new HashMap<>();
+               json.put("data", element);
+
+               return Requests.indexRequest()
+                       .index("my-index")
+                       .type("my-type")
+                       .id(element)
+                       .source(json);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch5/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch5/src/test/resources/log4j-test.properties
 
b/flink-connectors/flink-connector-elasticsearch5/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2055184
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch5/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target=System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch5/src/test/resources/log4j2.properties
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch5/src/test/resources/log4j2.properties
 
b/flink-connectors/flink-connector-elasticsearch5/src/test/resources/log4j2.properties
deleted file mode 100644
index dc20726..0000000
--- 
a/flink-connectors/flink-connector-elasticsearch5/src/test/resources/log4j2.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=OFF, testlogger
-
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index e19c77f..5d8ca70 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -45,9 +45,9 @@ under the License.
                <module>flink-connector-kafka-0.8</module>
                <module>flink-connector-kafka-0.9</module>
                <module>flink-connector-kafka-0.10</module>
+               <module>flink-connector-elasticsearch-base</module>
                <module>flink-connector-elasticsearch</module>
                <module>flink-connector-elasticsearch2</module>
-               <module>flink-connector-elasticsearch5</module>
                <module>flink-connector-rabbitmq</module>
                <module>flink-connector-twitter</module>
                <module>flink-connector-nifi</module>
@@ -86,6 +86,20 @@ under the License.
                                <module>flink-connector-kinesis</module>
                        </modules>
                </profile>
+
+               <!--
+                       Since Elasticsearch 5.x requires Java 8 at a minimum, 
we use this profile
+                       to include it as part of Java 8 builds only.
+               -->
+               <profile>
+                       <id>include-elasticsearch5</id>
+                       <activation>
+                               <jdk>1.8</jdk>
+                       </activation>
+                       <modules>
+                               <module>flink-connector-elasticsearch5</module>
+                       </modules>
+               </profile>
        </profiles>
 
 </project>

Reply via email to