asfgit closed pull request #6391: [FLINK-9885] [FLINK-8101] Finalize 
Elasticsearch 6.x
URL: https://github.com/apache/flink/pull/6391
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/connectors/elasticsearch.md 
b/docs/dev/connectors/elasticsearch.md
index 52d1b58bf51..bafe391850b 100644
--- a/docs/dev/connectors/elasticsearch.md
+++ b/docs/dev/connectors/elasticsearch.md
@@ -55,6 +55,11 @@ of the Elasticsearch installation:
         <td>1.3.0</td>
         <td>5.x</td>
     </tr>
+    <tr>
+        <td>flink-connector-elasticsearch6{{ site.scala_version_suffix }}</td>
+        <td>1.6.0</td>
+        <td>6 and later versions</td>
+    </tr>
   </tbody>
 </table>
 
@@ -71,7 +76,7 @@ creating an `ElasticsearchSink` for requesting document 
actions against your clu
 
 ## Elasticsearch Sink
 
-The `ElasticsearchSink` uses a `TransportClient` to communicate with an
+The `ElasticsearchSink` uses a `TransportClient` (before 6.x) or 
`RestHighLevelClient` (starting with 6.x) to communicate with an
 Elasticsearch cluster.
 
 The example below shows how to configure and create a sink:
@@ -79,6 +84,23 @@ The example below shows how to configure and create a sink:
 <div class="codetabs" markdown="1">
 <div data-lang="java, Elasticsearch 1.x" markdown="1">
 {% highlight java %}
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.transport.TransportAddress;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 DataStream<String> input = ...;
 
 Map<String, String> config = new HashMap<>();
@@ -110,6 +132,22 @@ input.addSink(new ElasticsearchSink<>(config, 
transportAddresses, new Elasticsea
 </div>
 <div data-lang="java, Elasticsearch 2.x / 5.x" markdown="1">
 {% highlight java %}
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;
+
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 DataStream<String> input = ...;
 
 Map<String, String> config = new HashMap<>();
@@ -138,8 +176,85 @@ input.addSink(new ElasticsearchSink<>(config, 
transportAddresses, new Elasticsea
     }
 }));{% endhighlight %}
 </div>
+<div data-lang="java, Elasticsearch 6.x" markdown="1">
+{% highlight java %}
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+DataStream<String> input = ...;
+
+List<HttpHost> httpHost = new ArrayList<>();
+httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
+httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));
+
+// use a ElasticsearchSink.Builder to create an ElasticsearchSink
+ElasticsearchSink.Builder<String> esSinkBuilder = new 
ElasticsearchSink.Builder<>(
+    httpHosts,
+    new ElasticsearchSinkFunction<String>() {
+        public IndexRequest createIndexRequest(String element) {
+            Map<String, String> json = new HashMap<>();
+            json.put("data", element);
+        
+            return Requests.indexRequest()
+                    .index("my-index")
+                    .type("my-type")
+                    .source(json);
+        }
+        
+        @Override
+        public void process(String element, RuntimeContext ctx, RequestIndexer 
indexer) {
+            indexer.add(createIndexRequest(element));
+        }
+    }
+);
+
+// configuration for the bulk requests; this instructs the sink to emit after 
every element, otherwise they would be buffered
+builder.setBulkFlushMaxActions(1);
+
+// provide a RestClientFactory for custom configuration on the internally 
created REST client
+builder.setRestClientBuilder(
+  restClientBuilder -> {
+    restClientBuilder.setDefaultHeaders(...)
+    restClientBuilder.setMaxRetryTimeoutMillis(...)
+    restClientBuilder.setPathPrefix(...)
+    restClientBuilder.setHttpClientConfigCallback(...)
+  }
+);
+
+// finally, build and add the sink to the job's pipeline
+input.addSink(esSinkBuilder.build());
+{% endhighlight %}
+</div>
 <div data-lang="scala, Elasticsearch 1.x" markdown="1">
 {% highlight scala %}
+import org.apache.flink.api.common.functions.RuntimeContext
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
+
+import org.elasticsearch.action.index.IndexRequest
+import org.elasticsearch.client.Requests
+import org.elasticsearch.common.transport.InetSocketTransportAddress
+import org.elasticsearch.common.transport.TransportAddress
+
+import java.net.InetAddress
+import java.util.ArrayList
+import java.util.HashMap
+import java.util.List
+import java.util.Map
+
 val input: DataStream[String] = ...
 
 val config = new java.util.HashMap[String, String]
@@ -166,6 +281,22 @@ input.addSink(new ElasticsearchSink(config, 
transportAddresses, new Elasticsearc
 </div>
 <div data-lang="scala, Elasticsearch 2.x / 5.x" markdown="1">
 {% highlight scala %}
+import org.apache.flink.api.common.functions.RuntimeContext
+import org.apache.flink.streaming.api.datastream.DataStream
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
+import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink
+
+import org.elasticsearch.action.index.IndexRequest
+import org.elasticsearch.client.Requests
+
+import java.net.InetAddress
+import java.net.InetSocketAddress
+import java.util.ArrayList
+import java.util.HashMap
+import java.util.List
+import java.util.Map
+
 val input: DataStream[String] = ...
 
 val config = new java.util.HashMap[String, String]
@@ -190,14 +321,74 @@ input.addSink(new ElasticsearchSink(config, 
transportAddresses, new Elasticsearc
 }))
 {% endhighlight %}
 </div>
+<div data-lang="scala, Elasticsearch 6.x" markdown="1">
+{% highlight scala %}
+import org.apache.flink.api.common.functions.RuntimeContext
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
+import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
+
+import org.apache.http.HttpHost
+import org.elasticsearch.action.index.IndexRequest
+import org.elasticsearch.client.Requests
+
+import java.util.ArrayList
+import java.util.List
+
+val input: DataStream[String] = ...
+
+val httpHosts = new java.util.ArrayList[HttpHost]
+httpHosts.add(new HttpHost("127.0.0.1", 9300, "http"))
+httpHosts.add(new HttpHost("10.2.3.1", 9300, "http"))
+
+val esSinkBuilder = new ElasticsearchSink.Builer[String](
+  httpHosts,
+  new ElasticsearchSinkFunction[String] {
+    def createIndexRequest(element: String): IndexRequest = {
+      val json = new java.util.HashMap[String, String]
+      json.put("data", element)
+      
+      return Requests.indexRequest()
+              .index("my-index")
+              .type("my-type")
+              .source(json)
+    }
+  }
+)
+
+// configuration for the bulk requests; this instructs the sink to emit after 
every element, otherwise they would be buffered
+builder.setBulkFlushMaxActions(1)
+
+// provide a RestClientFactory for custom configuration on the internally 
created REST client
+builder.setRestClientBuilder(
+  restClientBuilder -> {
+    restClientBuilder.setDefaultHeaders(...)
+    restClientBuilder.setMaxRetryTimeoutMillis(...)
+    restClientBuilder.setPathPrefix(...)
+    restClientBuilder.setHttpClientConfigCallback(...)
+  }
+)
+
+// finally, build and add the sink to the job's pipeline
+input.addSink(esSinkBuilder.build)
+{% endhighlight %}
+</div>
 </div>
 
-Note how a `Map` of `String`s is used to configure the `ElasticsearchSink`.
+For Elasticsearch versions that still uses the now deprecated 
`TransportClient` to communicate
+with the Elasticsearch cluster (i.e., versions equal or below 5.x), note how a 
`Map` of `String`s
+is used to configure the `ElasticsearchSink`. This config map will be directly
+forwarded when creating the internally used `TransportClient`.
 The configuration keys are documented in the Elasticsearch documentation
 
[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html).
 Especially important is the `cluster.name` parameter that must correspond to
 the name of your cluster.
 
+For Elasticsearch 6.x and above, internally, the `RestHighLevelClient` is used 
for cluster communication.
+By default, the connector uses the default configurations for the REST client. 
To have custom
+configuration for the REST client, users can provide a `RestClientFactory` 
implementation when 
+setting up the `ElasticsearchClient.Builder` that builds the sink.
+
 Also note that the example only demonstrates performing a single index
 request for each incoming element. Generally, the `ElasticsearchSinkFunction`
 can be used to perform multiple requests of different types (ex.,
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
index 2ebb97c82e2..33b42cb47f1 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
@@ -22,6 +22,9 @@
 
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
 
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -45,12 +48,32 @@
        }
 
        @Override
-       public void add(ActionRequest... actionRequests) {
-               for (ActionRequest actionRequest : actionRequests) {
+       public void add(DeleteRequest... deleteRequests) {
+               for (DeleteRequest deleteRequest : deleteRequests) {
                        if (flushOnCheckpoint) {
                                numPendingRequestsRef.getAndIncrement();
                        }
-                       this.bulkProcessor.add(actionRequest);
+                       this.bulkProcessor.add(deleteRequest);
+               }
+       }
+
+       @Override
+       public void add(IndexRequest... indexRequests) {
+               for (IndexRequest indexRequest : indexRequests) {
+                       if (flushOnCheckpoint) {
+                               numPendingRequestsRef.getAndIncrement();
+                       }
+                       this.bulkProcessor.add(indexRequest);
+               }
+       }
+
+       @Override
+       public void add(UpdateRequest... updateRequests) {
+               for (UpdateRequest updateRequest : updateRequests) {
+                       if (flushOnCheckpoint) {
+                               numPendingRequestsRef.getAndIncrement();
+                       }
+                       this.bulkProcessor.add(updateRequest);
                }
        }
 }
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
index 2a7a21659e4..f1dcc83f652 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
@@ -22,10 +22,10 @@
 
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.client.Client;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
 
@@ -37,17 +37,28 @@
  * <p>Implementations are allowed to be stateful. For example, for 
Elasticsearch 1.x, since connecting via an embedded node
  * is allowed, the call bridge will hold reference to the created embedded 
node. Each instance of the sink will hold
  * exactly one instance of the call bridge, and state cleanup is performed 
when the sink is closed.
+ *
+ * @param <C> The Elasticsearch client, that implements {@link AutoCloseable}.
  */
 @Internal
-public interface ElasticsearchApiCallBridge extends Serializable {
+public interface ElasticsearchApiCallBridge<C extends AutoCloseable> extends 
Serializable {
 
        /**
-        * Creates an Elasticsearch {@link Client}.
+        * Creates an Elasticsearch client implementing {@link AutoCloseable}.
         *
         * @param clientConfig The configuration to use when constructing the 
client.
         * @return The created client.
         */
-       Client createClient(Map<String, String> clientConfig);
+       C createClient(Map<String, String> clientConfig) throws IOException;
+
+       /**
+        * Creates a {@link BulkProcessor.Builder} for creating the bulk 
processor.
+        *
+        * @param client the Elasticsearch client.
+        * @param listener the bulk processor listender.
+        * @return the bulk processor builder.
+        */
+       BulkProcessor.Builder createBulkProcessorBuilder(C client, 
BulkProcessor.Listener listener);
 
        /**
         * Extracts the cause of failure of a bulk item action.
@@ -71,6 +82,8 @@ void configureBulkProcessorBackoff(
        /**
         * Perform any necessary state cleanup.
         */
-       void cleanup();
+       default void cleanup() {
+               // nothing to cleanup by default
+       }
 
 }
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
index 9105d9947f2..7dac06ceb8a 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.connectors.elasticsearch;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.Configuration;
@@ -62,9 +63,10 @@
  * for example, to create a Elasticsearch {@link Client}, handle failed item 
responses, etc.
  *
  * @param <T> Type of the elements handled by this sink
+ * @param <C> Type of the Elasticsearch client, which implements {@link 
AutoCloseable}
  */
 @Internal
-public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> 
implements CheckpointedFunction {
+public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> 
extends RichSinkFunction<T> implements CheckpointedFunction {
 
        private static final long serialVersionUID = -1007596293618451942L;
 
@@ -85,6 +87,7 @@
        /**
         * Used to control whether the retry delay should increase 
exponentially or remain constant.
         */
+       @PublicEvolving
        public enum FlushBackoffType {
                CONSTANT,
                EXPONENTIAL
@@ -135,14 +138,20 @@ public void setDelayMillis(long delayMillis) {
 
        private final Integer bulkProcessorFlushMaxActions;
        private final Integer bulkProcessorFlushMaxSizeMb;
-       private final Integer bulkProcessorFlushIntervalMillis;
+       private final Long bulkProcessorFlushIntervalMillis;
        private final BulkFlushBackoffPolicy bulkProcessorFlushBackoffPolicy;
 
        // 
------------------------------------------------------------------------
        //  User-facing API and configuration
        // 
------------------------------------------------------------------------
 
-       /** The user specified config map that we forward to Elasticsearch when 
we create the {@link Client}. */
+       /**
+        * The config map that contains configuration for the bulk flushing 
behaviours.
+        *
+        * <p>For {@link org.elasticsearch.client.transport.TransportClient} 
based implementations, this config
+        * map would also contain Elasticsearch-shipped configuration, and 
therefore this config map
+        * would also be forwarded when creating the Elasticsearch client.
+        */
        private final Map<String, String> userConfig;
 
        /** The function that is used to construct multiple {@link 
ActionRequest ActionRequests} from each incoming element. */
@@ -162,7 +171,7 @@ public void setDelayMillis(long delayMillis) {
        // 
------------------------------------------------------------------------
 
        /** Call bridge for different version-specific. */
-       private final ElasticsearchApiCallBridge callBridge;
+       private final ElasticsearchApiCallBridge<C> callBridge;
 
        /**
         * Number of pending action requests not yet acknowledged by 
Elasticsearch.
@@ -176,7 +185,7 @@ public void setDelayMillis(long delayMillis) {
        private AtomicLong numPendingRequests = new AtomicLong(0);
 
        /** Elasticsearch client created using the call bridge. */
-       private transient Client client;
+       private transient C client;
 
        /** Bulk processor to buffer and send requests to Elasticsearch, 
created using the client. */
        private transient BulkProcessor bulkProcessor;
@@ -237,7 +246,7 @@ public ElasticsearchSinkBase(
                }
 
                if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
-                       bulkProcessorFlushIntervalMillis = 
params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS);
+                       bulkProcessorFlushIntervalMillis = 
params.getLong(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS);
                        userConfig.remove(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS);
                } else {
                        bulkProcessorFlushIntervalMillis = null;
@@ -341,7 +350,7 @@ public void close() throws Exception {
        protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener 
listener) {
                checkNotNull(listener);
 
-               BulkProcessor.Builder bulkProcessorBuilder = 
BulkProcessor.builder(client, listener);
+               BulkProcessor.Builder bulkProcessorBuilder = 
callBridge.createBulkProcessorBuilder(client, listener);
 
                // This makes flush() blocking
                bulkProcessorBuilder.setConcurrentRequests(0);
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
index 2a1b29736b6..3dc8f879641 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
@@ -21,9 +21,12 @@
 import org.apache.flink.annotation.PublicEvolving;
 
 import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
 
 /**
- * Users add multiple {@link ActionRequest ActionRequests} to a {@link 
RequestIndexer} to prepare
+ * Users add multiple delete, index or update requests to a {@link 
RequestIndexer} to prepare
  * them for sending to an Elasticsearch cluster.
  */
 @PublicEvolving
@@ -33,6 +36,41 @@
         * Add multiple {@link ActionRequest} to the indexer to prepare for 
sending requests to Elasticsearch.
         *
         * @param actionRequests The multiple {@link ActionRequest} to add.
+        * @deprecated use the {@link DeleteRequest}, {@link IndexRequest} or 
{@link UpdateRequest}
         */
-       void add(ActionRequest... actionRequests);
+       @Deprecated
+       default void add(ActionRequest... actionRequests) {
+               for (ActionRequest actionRequest : actionRequests) {
+                       if (actionRequest instanceof IndexRequest) {
+                               add((IndexRequest) actionRequest);
+                       } else if (actionRequest instanceof DeleteRequest) {
+                               add((DeleteRequest) actionRequest);
+                       } else if (actionRequest instanceof UpdateRequest) {
+                               add((UpdateRequest) actionRequest);
+                       } else {
+                               throw new 
IllegalArgumentException("RequestIndexer only supports Index, Delete and Update 
requests");
+                       }
+               }
+       }
+
+       /**
+        * Add multiple {@link DeleteRequest} to the indexer to prepare for 
sending requests to Elasticsearch.
+        *
+        * @param deleteRequests The multiple {@link DeleteRequest} to add.
+        */
+       void add(DeleteRequest... deleteRequests);
+
+       /**
+        * Add multiple {@link IndexRequest} to the indexer to prepare for 
sending requests to Elasticsearch.
+        *
+        * @param indexRequests The multiple {@link IndexRequest} to add.
+        */
+       void add(IndexRequest... indexRequests);
+
+       /**
+        * Add multiple {@link UpdateRequest} to the indexer to prepare for 
sending requests to Elasticsearch.
+        *
+        * @param updateRequests The multiple {@link UpdateRequest} to add.
+        */
+       void add(UpdateRequest... updateRequests);
 }
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
index 09d8806b963..369d26a735a 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
@@ -31,6 +31,7 @@
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.Requests;
 import org.junit.Assert;
@@ -92,7 +93,7 @@ public void testItemFailureRethrownOnInvoke() throws 
Throwable {
                // setup the next bulk request, and its mock item failures
                
sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new
 Exception("artificial failure for record")));
                testHarness.processElement(new StreamRecord<>("msg"));
-               verify(sink.getMockBulkProcessor(), 
times(1)).add(any(ActionRequest.class));
+               verify(sink.getMockBulkProcessor(), 
times(1)).add(any(IndexRequest.class));
 
                // manually execute the next bulk request
                sink.manualBulkRequestWithAllPendingRequests();
@@ -124,7 +125,7 @@ public void testItemFailureRethrownOnCheckpoint() throws 
Throwable {
                // setup the next bulk request, and its mock item failures
                
sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new
 Exception("artificial failure for record")));
                testHarness.processElement(new StreamRecord<>("msg"));
-               verify(sink.getMockBulkProcessor(), 
times(1)).add(any(ActionRequest.class));
+               verify(sink.getMockBulkProcessor(), 
times(1)).add(any(IndexRequest.class));
 
                // manually execute the next bulk request
                sink.manualBulkRequestWithAllPendingRequests();
@@ -164,7 +165,7 @@ public void testItemFailureRethrownOnCheckpointAfterFlush() 
throws Throwable {
                
sink.setMockItemFailuresListForNextBulkItemResponses(mockResponsesList);
 
                testHarness.processElement(new StreamRecord<>("msg-1"));
-               verify(sink.getMockBulkProcessor(), 
times(1)).add(any(ActionRequest.class));
+               verify(sink.getMockBulkProcessor(), 
times(1)).add(any(IndexRequest.class));
 
                // manually execute the next bulk request (1 request only, thus 
should succeed)
                sink.manualBulkRequestWithAllPendingRequests();
@@ -172,7 +173,7 @@ public void testItemFailureRethrownOnCheckpointAfterFlush() 
throws Throwable {
                // setup the requests to be flushed in the snapshot
                testHarness.processElement(new StreamRecord<>("msg-2"));
                testHarness.processElement(new StreamRecord<>("msg-3"));
-               verify(sink.getMockBulkProcessor(), 
times(3)).add(any(ActionRequest.class));
+               verify(sink.getMockBulkProcessor(), 
times(3)).add(any(IndexRequest.class));
 
                CheckedThread snapshotThread = new CheckedThread() {
                        @Override
@@ -217,7 +218,7 @@ public void testBulkFailureRethrownOnInvoke() throws 
Throwable {
                // setup the next bulk request, and let the whole bulk request 
fail
                sink.setFailNextBulkRequestCompletely(new Exception("artificial 
failure for bulk request"));
                testHarness.processElement(new StreamRecord<>("msg"));
-               verify(sink.getMockBulkProcessor(), 
times(1)).add(any(ActionRequest.class));
+               verify(sink.getMockBulkProcessor(), 
times(1)).add(any(IndexRequest.class));
 
                // manually execute the next bulk request
                sink.manualBulkRequestWithAllPendingRequests();
@@ -249,7 +250,7 @@ public void testBulkFailureRethrownOnCheckpoint() throws 
Throwable {
                // setup the next bulk request, and let the whole bulk request 
fail
                sink.setFailNextBulkRequestCompletely(new Exception("artificial 
failure for bulk request"));
                testHarness.processElement(new StreamRecord<>("msg"));
-               verify(sink.getMockBulkProcessor(), 
times(1)).add(any(ActionRequest.class));
+               verify(sink.getMockBulkProcessor(), 
times(1)).add(any(IndexRequest.class));
 
                // manually execute the next bulk request
                sink.manualBulkRequestWithAllPendingRequests();
@@ -284,7 +285,7 @@ public void 
testBulkFailureRethrownOnOnCheckpointAfterFlush() throws Throwable {
                // setup the next bulk request, and let bulk request succeed
                
sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList((Exception)
 null));
                testHarness.processElement(new StreamRecord<>("msg-1"));
-               verify(sink.getMockBulkProcessor(), 
times(1)).add(any(ActionRequest.class));
+               verify(sink.getMockBulkProcessor(), 
times(1)).add(any(IndexRequest.class));
 
                // manually execute the next bulk request
                sink.manualBulkRequestWithAllPendingRequests();
@@ -292,7 +293,7 @@ public void 
testBulkFailureRethrownOnOnCheckpointAfterFlush() throws Throwable {
                // setup the requests to be flushed in the snapshot
                testHarness.processElement(new StreamRecord<>("msg-2"));
                testHarness.processElement(new StreamRecord<>("msg-3"));
-               verify(sink.getMockBulkProcessor(), 
times(3)).add(any(ActionRequest.class));
+               verify(sink.getMockBulkProcessor(), 
times(3)).add(any(IndexRequest.class));
 
                CheckedThread snapshotThread = new CheckedThread() {
                        @Override
@@ -346,7 +347,7 @@ public void testAtLeastOnceSink() throws Throwable {
                // it contains 1 request, which will fail and re-added to the 
next bulk request
                
sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new
 Exception("artificial failure for record")));
                testHarness.processElement(new StreamRecord<>("msg"));
-               verify(sink.getMockBulkProcessor(), 
times(1)).add(any(ActionRequest.class));
+               verify(sink.getMockBulkProcessor(), 
times(1)).add(any(IndexRequest.class));
 
                CheckedThread snapshotThread = new CheckedThread() {
                        @Override
@@ -402,7 +403,7 @@ public void 
testDoesNotWaitForPendingRequestsIfFlushingDisabled() throws Excepti
                // setup the next bulk request, and let bulk request succeed
                
sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new
 Exception("artificial failure for record")));
                testHarness.processElement(new StreamRecord<>("msg-1"));
-               verify(sink.getMockBulkProcessor(), 
times(1)).add(any(ActionRequest.class));
+               verify(sink.getMockBulkProcessor(), 
times(1)).add(any(IndexRequest.class));
 
                // the snapshot should not block even though we haven't flushed 
the bulk request
                testHarness.snapshot(1L, 1000L);
@@ -410,7 +411,7 @@ public void 
testDoesNotWaitForPendingRequestsIfFlushingDisabled() throws Excepti
                testHarness.close();
        }
 
-       private static class DummyElasticsearchSink<T> extends 
ElasticsearchSinkBase<T> {
+       private static class DummyElasticsearchSink<T> extends 
ElasticsearchSinkBase<T, Client> {
 
                private static final long serialVersionUID = 
5051907841570096991L;
 
@@ -478,11 +479,11 @@ public BulkProcessor getMockBulkProcessor() {
                protected BulkProcessor buildBulkProcessor(final 
BulkProcessor.Listener listener) {
                        this.mockBulkProcessor = mock(BulkProcessor.class);
 
-                       
when(mockBulkProcessor.add(any(ActionRequest.class))).thenAnswer(new 
Answer<Object>() {
+                       
when(mockBulkProcessor.add(any(IndexRequest.class))).thenAnswer(new 
Answer<Object>() {
                                @Override
                                public Object answer(InvocationOnMock 
invocationOnMock) throws Throwable {
                                        // intercept the request and add it to 
our mock bulk request
-                                       
nextBulkRequest.add(invocationOnMock.getArgumentAt(0, ActionRequest.class));
+                                       
nextBulkRequest.add(invocationOnMock.getArgumentAt(0, IndexRequest.class));
 
                                        return null;
                                }
@@ -530,7 +531,7 @@ public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
                }
        }
 
-       private static class DummyElasticsearchApiCallBridge implements 
ElasticsearchApiCallBridge {
+       private static class DummyElasticsearchApiCallBridge implements 
ElasticsearchApiCallBridge<Client> {
 
                private static final long serialVersionUID = 
-4272760730959041699L;
 
@@ -539,6 +540,11 @@ public Client createClient(Map<String, String> 
clientConfig) {
                        return mock(Client.class);
                }
 
+               @Override
+               public BulkProcessor.Builder createBulkProcessorBuilder(Client 
client, BulkProcessor.Listener listener) {
+                       return null;
+               }
+
                @Nullable
                @Override
                public Throwable 
extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
@@ -553,11 +559,6 @@ public Throwable 
extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkIt
                public void configureBulkProcessorBackoff(BulkProcessor.Builder 
builder, @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy 
flushBackoffPolicy) {
                        // no need for this in the test cases here
                }
-
-               @Override
-               public void cleanup() {
-                       // nothing to cleanup
-               }
        }
 
        private static class SimpleSinkFunction<String> implements 
ElasticsearchSinkFunction<String> {
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
index df3779b1fd4..819ffba5d2a 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
@@ -26,7 +26,6 @@
 import org.apache.flink.util.InstantiationUtil;
 
 import org.elasticsearch.client.Client;
-import org.elasticsearch.client.transport.TransportClient;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -34,19 +33,20 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.InetSocketAddress;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
  * Environment preparation and suite of tests for version-specific {@link 
ElasticsearchSinkBase} implementations.
+ *
+ * @param <C> Elasticsearch client type
+ * @param <A> The address type to use
  */
-public abstract class ElasticsearchSinkTestBase extends AbstractTestBase {
+public abstract class ElasticsearchSinkTestBase<C extends AutoCloseable, A> 
extends AbstractTestBase {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(ElasticsearchSinkTestBase.class);
 
@@ -85,24 +85,21 @@ public static void shutdown() throws Exception {
        }
 
        /**
-        * Tests that the Elasticsearch sink works properly using a {@link 
TransportClient}.
+        * Tests that the Elasticsearch sink works properly.
         */
-       public void runTransportClientTest() throws Exception {
-               final String index = "transport-client-test-index";
+       public void runElasticsearchSinkTest() throws Exception {
+               final String index = "elasticsearch-sink-test-index";
 
                final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
                DataStreamSource<Tuple2<Integer, String>> source = 
env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
 
-               Map<String, String> userConfig = new HashMap<>();
-               // This instructs the sink to emit after every element, 
otherwise they would be buffered
-               
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-               userConfig.put("cluster.name", CLUSTER_NAME);
-
                source.addSink(createElasticsearchSinkForEmbeddedNode(
-                       userConfig, new 
SourceSinkDataTestKit.TestElasticsearchSinkFunction(index)));
+                               1,
+                               CLUSTER_NAME,
+                               new 
SourceSinkDataTestKit.TestElasticsearchSinkFunction(index)));
 
-               env.execute("Elasticsearch TransportClient Test");
+               env.execute("Elasticsearch Sink Test");
 
                // verify the results
                Client client = embeddedNodeEnv.getClient();
@@ -112,16 +109,20 @@ public void runTransportClientTest() throws Exception {
        }
 
        /**
-        * Tests that the Elasticsearch sink fails eagerly if the provided list 
of transport addresses is {@code null}.
+        * Tests that the Elasticsearch sink fails eagerly if the provided list 
of addresses is {@code null}.
         */
-       public void runNullTransportClientTest() throws Exception {
+       public void runNullAddressesTest() throws Exception {
                Map<String, String> userConfig = new HashMap<>();
                
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-               userConfig.put("cluster.name", "my-transport-client-cluster");
+               userConfig.put("cluster.name", CLUSTER_NAME);
 
                try {
-                       createElasticsearchSink(userConfig, null, new 
SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
-               } catch (IllegalArgumentException expectedException) {
+                       createElasticsearchSink(
+                                       1,
+                                       CLUSTER_NAME,
+                                       null,
+                                       new 
SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
+               } catch (IllegalArgumentException | NullPointerException 
expectedException) {
                        // test passes
                        return;
                }
@@ -130,18 +131,19 @@ public void runNullTransportClientTest() throws Exception 
{
        }
 
        /**
-        * Tests that the Elasticsearch sink fails eagerly if the provided list 
of transport addresses is empty.
+        * Tests that the Elasticsearch sink fails eagerly if the provided list 
of addresses is empty.
         */
-       public void runEmptyTransportClientTest() throws Exception {
+       public void runEmptyAddressesTest() throws Exception {
                Map<String, String> userConfig = new HashMap<>();
                
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-               userConfig.put("cluster.name", "my-transport-client-cluster");
+               userConfig.put("cluster.name", CLUSTER_NAME);
 
                try {
                        createElasticsearchSink(
-                               userConfig,
-                               Collections.<InetSocketAddress>emptyList(),
-                               new 
SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
+                                       1,
+                                       CLUSTER_NAME,
+                                       Collections.emptyList(),
+                                       new 
SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
                } catch (IllegalArgumentException expectedException) {
                        // test passes
                        return;
@@ -153,39 +155,66 @@ public void runEmptyTransportClientTest() throws 
Exception {
        /**
         * Tests whether the Elasticsearch sink fails when there is no cluster 
to connect to.
         */
-       public void runTransportClientFailsTest() throws Exception {
+       public void runInvalidElasticsearchClusterTest() throws Exception {
                final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
                DataStreamSource<Tuple2<Integer, String>> source = 
env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
 
                Map<String, String> userConfig = new HashMap<>();
                
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-               userConfig.put("cluster.name", "my-transport-client-cluster");
+               userConfig.put("cluster.name", "invalid-cluster-name");
 
-               source.addSink(createElasticsearchSinkForEmbeddedNode(
-                       Collections.unmodifiableMap(userConfig), new 
SourceSinkDataTestKit.TestElasticsearchSinkFunction("test")));
+               source.addSink(createElasticsearchSinkForNode(
+                               1,
+                               "invalid-cluster-name",
+                               new 
SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"),
+                               "123.123.123.123")); // incorrect ip address
 
                try {
-                       env.execute("Elasticsearch Transport Client Test");
+                       env.execute("Elasticsearch Sink Test");
                } catch (JobExecutionException expectedException) {
-                       
assertTrue(expectedException.getCause().getMessage().contains("not connected to 
any Elasticsearch nodes"));
+                       // test passes
                        return;
                }
 
                fail();
        }
 
+       /**
+        * Utility method to create a user config map.
+        */
+       protected Map<String, String> createUserConfig(int bulkFlushMaxActions, 
String clusterName) {
+               Map<String, String> userConfig = new HashMap<>();
+               userConfig.put("cluster.name", clusterName);
+               
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, 
String.valueOf(bulkFlushMaxActions));
+
+               return userConfig;
+       }
+
        /** Creates a version-specific Elasticsearch sink, using arbitrary 
transport addresses. */
-       protected abstract <T> ElasticsearchSinkBase<T> 
createElasticsearchSink(Map<String, String> userConfig,
-                                                                               
                                                                        
List<InetSocketAddress> transportAddresses,
-                                                                               
                                                                        
ElasticsearchSinkFunction<T> elasticsearchSinkFunction);
+       protected abstract ElasticsearchSinkBase<Tuple2<Integer, String>, C> 
createElasticsearchSink(
+                       int bulkFlushMaxActions,
+                       String clusterName,
+                       List<A> addresses,
+                       ElasticsearchSinkFunction<Tuple2<Integer, String>> 
elasticsearchSinkFunction);
 
        /**
         * Creates a version-specific Elasticsearch sink to connect to a local 
embedded Elasticsearch node.
         *
-        * <p>This case is singled out from {@link 
ElasticsearchSinkTestBase#createElasticsearchSink(Map, List, 
ElasticsearchSinkFunction)}
+        * <p>This case is singled out from {@link 
ElasticsearchSinkTestBase#createElasticsearchSink(int, String, List, 
ElasticsearchSinkFunction)}
         * because the Elasticsearch Java API to do so is incompatible across 
different versions.
         */
-       protected abstract <T> ElasticsearchSinkBase<T> 
createElasticsearchSinkForEmbeddedNode(
-               Map<String, String> userConfig, ElasticsearchSinkFunction<T> 
elasticsearchSinkFunction) throws Exception;
+       protected abstract ElasticsearchSinkBase<Tuple2<Integer, String>, C> 
createElasticsearchSinkForEmbeddedNode(
+                       int bulkFlushMaxActions,
+                       String clusterName,
+                       ElasticsearchSinkFunction<Tuple2<Integer, String>> 
elasticsearchSinkFunction) throws Exception;
+
+       /**
+        * Creates a version-specific Elasticsearch sink to connect to a 
specific Elasticsearch node.
+        */
+       protected abstract ElasticsearchSinkBase<Tuple2<Integer, String>, C> 
createElasticsearchSinkForNode(
+                       int bulkFlushMaxActions,
+                       String clusterName,
+                       ElasticsearchSinkFunction<Tuple2<Integer, String>> 
elasticsearchSinkFunction,
+                       String ipAddress) throws Exception;
 }
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironment.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironment.java
index ea6e7a3ac70..fd14ba36370 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironment.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironment.java
@@ -29,7 +29,7 @@
  *       also be located under the same package. The intentional 
package-private accessibility of this interface
  *       enforces that.
  */
-interface EmbeddedElasticsearchNodeEnvironment {
+public interface EmbeddedElasticsearchNodeEnvironment {
 
        /**
         * Start an embedded Elasticsearch node instance.
diff --git 
a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
 
b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
index 2a3c2a06460..4f1cd086d8f 100644
--- 
a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
+++ 
b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
@@ -42,7 +42,7 @@
  * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 1.x.
  */
 @Internal
-public class Elasticsearch1ApiCallBridge implements ElasticsearchApiCallBridge 
{
+public class Elasticsearch1ApiCallBridge implements 
ElasticsearchApiCallBridge<Client> {
 
        private static final long serialVersionUID = -2632363720584123682L;
 
@@ -115,6 +115,11 @@ public Client createClient(Map<String, String> 
clientConfig) {
                }
        }
 
+       @Override
+       public BulkProcessor.Builder createBulkProcessorBuilder(Client client, 
BulkProcessor.Listener listener) {
+               return BulkProcessor.builder(client, listener);
+       }
+
        @Override
        public Throwable 
extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
                if (!bulkItemResponse.isFailed()) {
diff --git 
a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
 
b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
index e8eccd978f4..d5e1d1fdc12 100644
--- 
a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
+++ 
b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
@@ -23,6 +23,7 @@
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Client;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.node.Node;
@@ -64,7 +65,7 @@
  * @param <T> Type of the elements handled by this sink
  */
 @PublicEvolving
-public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
+public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, Client> {
 
        private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
 
b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
index 54892909abc..2f1a65c58ee 100644
--- 
a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
+++ 
b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
@@ -28,10 +28,12 @@
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.Requests;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
 import org.elasticsearch.common.transport.LocalTransportAddress;
 import org.elasticsearch.common.transport.TransportAddress;
 import org.junit.Test;
 
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -42,26 +44,26 @@
 /**
  * IT Cases for the {@link ElasticsearchSink}.
  */
-public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase<Client, 
InetSocketAddress> {
 
        @Test
-       public void testTransportClient() throws Exception {
-               runTransportClientTest();
+       public void testElasticsearchSink() throws Exception {
+               runElasticsearchSinkTest();
        }
 
        @Test
-       public void testNullTransportClient() throws Exception {
-               runNullTransportClientTest();
+       public void testNullAddresses() throws Exception {
+               runNullAddressesTest();
        }
 
        @Test
-       public void testEmptyTransportClient() throws Exception {
-               runEmptyTransportClientTest();
+       public void testEmptyAddresses() throws Exception {
+               runEmptyAddressesTest();
        }
 
        @Test
-       public void testTransportClientFails() throws Exception{
-               runTransportClientFailsTest();
+       public void testInvalidElasticsearchCluster() throws Exception{
+               runInvalidElasticsearchClusterTest();
        }
 
        // -- Tests specific to Elasticsearch 1.x --
@@ -102,19 +104,28 @@ public void testDeprecatedIndexRequestBuilderVariant() 
throws Exception {
        }
 
        @Override
-       protected <T> ElasticsearchSinkBase<T> 
createElasticsearchSink(Map<String, String> userConfig,
-                                                                               
                                                List<InetSocketAddress> 
transportAddresses,
-                                                                               
                                                ElasticsearchSinkFunction<T> 
elasticsearchSinkFunction) {
-               return new ElasticsearchSink<>(userConfig, 
ElasticsearchUtils.convertInetSocketAddresses(transportAddresses), 
elasticsearchSinkFunction);
+       protected ElasticsearchSinkBase<Tuple2<Integer, String>, Client> 
createElasticsearchSink(
+                       int bulkFlushMaxActions,
+                       String clusterName,
+                       List<InetSocketAddress> transportAddresses,
+                       ElasticsearchSinkFunction<Tuple2<Integer, String>> 
elasticsearchSinkFunction) {
+
+               return new ElasticsearchSink<>(
+                               
Collections.unmodifiableMap(createUserConfig(bulkFlushMaxActions, clusterName)),
+                               
ElasticsearchUtils.convertInetSocketAddresses(transportAddresses),
+                               elasticsearchSinkFunction);
        }
 
        @Override
-       protected <T> ElasticsearchSinkBase<T> 
createElasticsearchSinkForEmbeddedNode(
-               Map<String, String> userConfig, ElasticsearchSinkFunction<T> 
elasticsearchSinkFunction) throws Exception {
+       protected ElasticsearchSinkBase<Tuple2<Integer, String>, Client> 
createElasticsearchSinkForEmbeddedNode(
+                       int bulkFlushMaxActions,
+                       String clusterName,
+                       ElasticsearchSinkFunction<Tuple2<Integer, String>> 
elasticsearchSinkFunction) throws Exception {
+
+               Map<String, String> userConfig = 
createUserConfig(bulkFlushMaxActions, clusterName);
 
                // Elasticsearch 1.x requires this setting when using
                // LocalTransportAddress to connect to a local embedded node
-               userConfig = new HashMap<>(userConfig);
                userConfig.put("node.local", "true");
 
                List<TransportAddress> transports = new ArrayList<>();
@@ -126,6 +137,22 @@ public void testDeprecatedIndexRequestBuilderVariant() 
throws Exception {
                        elasticsearchSinkFunction);
        }
 
+       @Override
+       protected ElasticsearchSinkBase<Tuple2<Integer, String>, Client> 
createElasticsearchSinkForNode(
+                       int bulkFlushMaxActions,
+                       String clusterName,
+                       ElasticsearchSinkFunction<Tuple2<Integer, String>> 
elasticsearchSinkFunction,
+                       String ipAddress) throws Exception {
+
+               List<TransportAddress> transports = new ArrayList<>();
+               transports.add(new 
InetSocketTransportAddress(InetAddress.getByName(ipAddress), 9300));
+
+               return new ElasticsearchSink<>(
+                       
Collections.unmodifiableMap(createUserConfig(bulkFlushMaxActions, clusterName)),
+                       transports,
+                       elasticsearchSinkFunction);
+       }
+
        /**
         * A {@link IndexRequestBuilder} with equivalent functionality to 
{@link SourceSinkDataTestKit.TestElasticsearchSinkFunction}.
         */
diff --git 
a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
 
b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
index 390a4078e2b..73a69ebde34 100644
--- 
a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
+++ 
b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
@@ -26,7 +26,6 @@
 import org.elasticsearch.action.bulk.BackoffPolicy;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.client.Client;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
@@ -44,7 +43,7 @@
  * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 2.x.
  */
 @Internal
-public class Elasticsearch2ApiCallBridge implements ElasticsearchApiCallBridge 
{
+public class Elasticsearch2ApiCallBridge implements 
ElasticsearchApiCallBridge<TransportClient> {
 
        private static final long serialVersionUID = 2638252694744361079L;
 
@@ -63,7 +62,7 @@
        }
 
        @Override
-       public Client createClient(Map<String, String> clientConfig) {
+       public TransportClient createClient(Map<String, String> clientConfig) {
                Settings settings = 
Settings.settingsBuilder().put(clientConfig).build();
 
                TransportClient transportClient = 
TransportClient.builder().settings(settings).build();
@@ -83,6 +82,11 @@ public Client createClient(Map<String, String> clientConfig) 
{
                return transportClient;
        }
 
+       @Override
+       public BulkProcessor.Builder createBulkProcessorBuilder(TransportClient 
client, BulkProcessor.Listener listener) {
+               return BulkProcessor.builder(client, listener);
+       }
+
        @Override
        public Throwable 
extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
                if (!bulkItemResponse.isFailed()) {
@@ -117,10 +121,4 @@ public void configureBulkProcessorBackoff(
 
                builder.setBackoffPolicy(backoffPolicy);
        }
-
-       @Override
-       public void cleanup() {
-               // nothing to cleanup
-       }
-
 }
diff --git 
a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 
b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
index ffccacf40ac..a911905ac0a 100644
--- 
a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
+++ 
b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
@@ -58,7 +58,7 @@
  * @param <T> Type of the elements handled by this sink
  */
 @PublicEvolving
-public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
+public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, 
TransportClient> {
 
        private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
 
b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
index 7ded893be3a..7887e72fa10 100644
--- 
a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
+++ 
b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
@@ -17,57 +17,81 @@
 
 package org.apache.flink.streaming.connectors.elasticsearch2;
 
+import org.apache.flink.api.java.tuple.Tuple2;
 import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
 import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
 import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
 
+import org.elasticsearch.client.transport.TransportClient;
 import org.junit.Test;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 
 /**
  * IT cases for the {@link ElasticsearchSink}.
  */
-public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
+public class ElasticsearchSinkITCase extends 
ElasticsearchSinkTestBase<TransportClient, InetSocketAddress> {
 
        @Test
-       public void testTransportClient() throws Exception {
-               runTransportClientTest();
+       public void testElasticsearchSink() throws Exception {
+               runElasticsearchSinkTest();
        }
 
        @Test
-       public void testNullTransportClient() throws Exception {
-               runNullTransportClientTest();
+       public void testNullAddresses() throws Exception {
+               runNullAddressesTest();
        }
 
        @Test
-       public void testEmptyTransportClient() throws Exception {
-               runEmptyTransportClientTest();
+       public void testEmptyAddresses() throws Exception {
+               runEmptyAddressesTest();
        }
 
        @Test
-       public void testTransportClientFails() throws Exception{
-               runTransportClientFailsTest();
+       public void testInvalidElasticsearchCluster() throws Exception{
+               runInvalidElasticsearchClusterTest();
        }
 
        @Override
-       protected <T> ElasticsearchSinkBase<T> 
createElasticsearchSink(Map<String, String> userConfig,
-                                                                               
                                                List<InetSocketAddress> 
transportAddresses,
-                                                                               
                                                ElasticsearchSinkFunction<T> 
elasticsearchSinkFunction) {
-               return new ElasticsearchSink<>(userConfig, transportAddresses, 
elasticsearchSinkFunction);
+       protected ElasticsearchSinkBase<Tuple2<Integer, String>, 
TransportClient> createElasticsearchSink(
+                       int bulkFlushMaxActions,
+                       String clusterName,
+                       List<InetSocketAddress> transportAddresses,
+                       ElasticsearchSinkFunction<Tuple2<Integer, String>> 
elasticsearchSinkFunction) {
+
+               return new ElasticsearchSink<>(
+                               
Collections.unmodifiableMap(createUserConfig(bulkFlushMaxActions, clusterName)),
+                               transportAddresses,
+                               elasticsearchSinkFunction);
+       }
+
+       @Override
+       protected ElasticsearchSinkBase<Tuple2<Integer, String>, 
TransportClient> createElasticsearchSinkForEmbeddedNode(
+                       int bulkFlushMaxActions,
+                       String clusterName,
+                       ElasticsearchSinkFunction<Tuple2<Integer, String>> 
elasticsearchSinkFunction) throws Exception {
+
+               return createElasticsearchSinkForNode(
+                               bulkFlushMaxActions, clusterName, 
elasticsearchSinkFunction, "127.0.0.1");
        }
 
        @Override
-       protected <T> ElasticsearchSinkBase<T> 
createElasticsearchSinkForEmbeddedNode(
-               Map<String, String> userConfig, ElasticsearchSinkFunction<T> 
elasticsearchSinkFunction) throws Exception {
+       protected ElasticsearchSinkBase<Tuple2<Integer, String>, 
TransportClient> createElasticsearchSinkForNode(
+                       int bulkFlushMaxActions,
+                       String clusterName,
+                       ElasticsearchSinkFunction<Tuple2<Integer, String>> 
elasticsearchSinkFunction,
+                       String ipAddress) throws Exception {
 
                List<InetSocketAddress> transports = new ArrayList<>();
-               transports.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+               transports.add(new 
InetSocketAddress(InetAddress.getByName(ipAddress), 9300));
 
-               return new ElasticsearchSink<>(userConfig, transports, 
elasticsearchSinkFunction);
+               return new ElasticsearchSink<>(
+                               
Collections.unmodifiableMap(createUserConfig(bulkFlushMaxActions, clusterName)),
+                               transports,
+                               elasticsearchSinkFunction);
        }
 }
diff --git 
a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
 
b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
index 7c4ba7a97f1..a3453ec4445 100644
--- 
a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
+++ 
b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
@@ -26,7 +26,6 @@
 import org.elasticsearch.action.bulk.BackoffPolicy;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.client.Client;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.network.NetworkModule;
 import org.elasticsearch.common.settings.Settings;
@@ -47,7 +46,7 @@
  * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 5.x.
  */
 @Internal
-public class Elasticsearch5ApiCallBridge implements ElasticsearchApiCallBridge 
{
+public class Elasticsearch5ApiCallBridge implements 
ElasticsearchApiCallBridge<TransportClient> {
 
        private static final long serialVersionUID = -5222683870097809633L;
 
@@ -66,7 +65,7 @@
        }
 
        @Override
-       public Client createClient(Map<String, String> clientConfig) {
+       public TransportClient createClient(Map<String, String> clientConfig) {
                Settings settings = Settings.builder().put(clientConfig)
                        .put(NetworkModule.HTTP_TYPE_KEY, 
Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME)
                        .put(NetworkModule.TRANSPORT_TYPE_KEY, 
Netty3Plugin.NETTY_TRANSPORT_NAME)
@@ -89,6 +88,11 @@ public Client createClient(Map<String, String> clientConfig) 
{
                return transportClient;
        }
 
+       @Override
+       public BulkProcessor.Builder createBulkProcessorBuilder(TransportClient 
client, BulkProcessor.Listener listener) {
+               return BulkProcessor.builder(client, listener);
+       }
+
        @Override
        public Throwable 
extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
                if (!bulkItemResponse.isFailed()) {
@@ -123,10 +127,4 @@ public void configureBulkProcessorBackoff(
 
                builder.setBackoffPolicy(backoffPolicy);
        }
-
-       @Override
-       public void cleanup() {
-               // nothing to cleanup
-       }
-
 }
diff --git 
a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
 
b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
index 6c09337227a..b99b3539255 100644
--- 
a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
+++ 
b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
@@ -59,7 +59,7 @@
  * @param <T> Type of the elements handled by this sink
  */
 @PublicEvolving
-public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
+public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, 
TransportClient> {
 
        private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
 
b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
index ad7c664cac7..67daa409b7b 100644
--- 
a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
+++ 
b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
@@ -18,58 +18,85 @@
 
 package org.apache.flink.streaming.connectors.elasticsearch5;
 
+import org.apache.flink.api.java.tuple.Tuple2;
 import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
 import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
 import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
 
+import org.elasticsearch.client.transport.TransportClient;
 import org.junit.Test;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 
 /**
  * IT cases for the {@link ElasticsearchSink}.
+ *
+ * <p>The Elasticsearch ITCases for 5.x CANNOT be executed in the IDE 
directly, since it is required that the
+ * Log4J-to-SLF4J adapter dependency must be excluded from the test classpath 
for the Elasticsearch embedded
+ * node used in the tests to work properly.
  */
-public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
+public class ElasticsearchSinkITCase extends 
ElasticsearchSinkTestBase<TransportClient, InetSocketAddress> {
 
        @Test
-       public void testTransportClient() throws Exception {
-               runTransportClientTest();
+       public void testElasticsearchSink() throws Exception {
+               runElasticsearchSinkTest();
        }
 
        @Test
-       public void testNullTransportClient() throws Exception {
-               runNullTransportClientTest();
+       public void testNullAddresses() throws Exception {
+               runNullAddressesTest();
        }
 
        @Test
-       public void testEmptyTransportClient() throws Exception {
-               runEmptyTransportClientTest();
+       public void testEmptyAddresses() throws Exception {
+               runEmptyAddressesTest();
        }
 
        @Test
-       public void testTransportClientFails() throws Exception {
-               runTransportClientFailsTest();
+       public void testInvalidElasticsearchCluster() throws Exception{
+               runInvalidElasticsearchClusterTest();
        }
 
        @Override
-       protected <T> ElasticsearchSinkBase<T> 
createElasticsearchSink(Map<String, String> userConfig,
-                                                                               
                                                List<InetSocketAddress> 
transportAddresses,
-                                                                               
                                                ElasticsearchSinkFunction<T> 
elasticsearchSinkFunction) {
-               return new ElasticsearchSink<>(userConfig, transportAddresses, 
elasticsearchSinkFunction);
+       protected ElasticsearchSinkBase<Tuple2<Integer, String>, 
TransportClient> createElasticsearchSink(
+                       int bulkFlushMaxActions,
+                       String clusterName,
+                       List<InetSocketAddress> addresses,
+                       ElasticsearchSinkFunction<Tuple2<Integer, String>> 
elasticsearchSinkFunction) {
+
+               return new ElasticsearchSink<>(
+                               
Collections.unmodifiableMap(createUserConfig(bulkFlushMaxActions, clusterName)),
+                               addresses,
+                               elasticsearchSinkFunction);
        }
 
        @Override
-       protected <T> ElasticsearchSinkBase<T> 
createElasticsearchSinkForEmbeddedNode(
-               Map<String, String> userConfig, ElasticsearchSinkFunction<T> 
elasticsearchSinkFunction) throws Exception {
+       protected ElasticsearchSinkBase<Tuple2<Integer, String>, 
TransportClient> createElasticsearchSinkForEmbeddedNode(
+                       int bulkFlushMaxActions,
+                       String clusterName,
+                       ElasticsearchSinkFunction<Tuple2<Integer, String>> 
elasticsearchSinkFunction) throws Exception {
+
+               return createElasticsearchSinkForNode(
+                               bulkFlushMaxActions, clusterName, 
elasticsearchSinkFunction, "127.0.0.1");
+       }
+
+       @Override
+       protected ElasticsearchSinkBase<Tuple2<Integer, String>, 
TransportClient> createElasticsearchSinkForNode(
+                       int bulkFlushMaxActions,
+                       String clusterName,
+                       ElasticsearchSinkFunction<Tuple2<Integer, String>> 
elasticsearchSinkFunction,
+                       String ipAddress) throws Exception {
 
                List<InetSocketAddress> transports = new ArrayList<>();
-               transports.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+               transports.add(new 
InetSocketAddress(InetAddress.getByName(ipAddress), 9300));
 
-               return new ElasticsearchSink<>(userConfig, transports, 
elasticsearchSinkFunction);
+               return new ElasticsearchSink<>(
+                               
Collections.unmodifiableMap(createUserConfig(bulkFlushMaxActions, clusterName)),
+                               transports,
+                               elasticsearchSinkFunction);
        }
-
 }
diff --git a/flink-connectors/flink-connector-elasticsearch6/pom.xml 
b/flink-connectors/flink-connector-elasticsearch6/pom.xml
new file mode 100644
index 00000000000..ef06d80512b
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/pom.xml
@@ -0,0 +1,180 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-connectors</artifactId>
+               <version>1.7-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       
<artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId>
+       <name>flink-connector-elasticsearch6</name>
+
+       <packaging>jar</packaging>
+
+       <!-- Allow users to pass custom connector versions -->
+       <properties>
+               <elasticsearch.version>6.3.1</elasticsearch.version>
+       </properties>
+
+       <dependencies>
+
+               <!-- core dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <exclusions>
+                               <!-- Elasticsearch Java Client has been moved 
to a different module in 5.x -->
+                               <exclusion>
+                                       <groupId>org.elasticsearch</groupId>
+                                       <artifactId>elasticsearch</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+               <!-- Dependency for Elasticsearch 6.x REST Client -->
+               <dependency>
+                       <groupId>org.elasticsearch.client</groupId>
+                       
<artifactId>elasticsearch-rest-high-level-client</artifactId>
+                       <version>${elasticsearch.version}</version>
+               </dependency>
+
+               <!--
+                       Elasticsearch 5.x uses Log4j2 and no longer detects 
logging implementations, making
+                       Log4j2 a strict dependency. The following is added so 
that the Log4j2 API in
+                       Elasticsearch 5.x is routed to SLF4J. This way, user 
projects can remain flexible
+                       in the logging implementation preferred.
+               -->
+
+               <dependency>
+                       <groupId>org.apache.logging.log4j</groupId>
+                       <artifactId>log4j-to-slf4j</artifactId>
+                       <version>2.9.1</version>
+               </dependency>
+
+               <!-- test dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>org.elasticsearch</groupId>
+                                       <artifactId>elasticsearch</artifactId>
+                               </exclusion>
+                       </exclusions>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <!--
+                        Including elasticsearch transport dependency for 
tests. Netty3 is not here anymore in 6.x
+               -->
+
+               <dependency>
+                       <groupId>org.elasticsearch.client</groupId>
+                       <artifactId>transport</artifactId>
+                       <version>${elasticsearch.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.elasticsearch.plugin</groupId>
+                       <artifactId>transport-netty4-client</artifactId>
+                       <version>${elasticsearch.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <!--
+                       Including Log4j2 dependencies for tests is required for 
the
+                       embedded Elasticsearch nodes used in tests to run 
correctly.
+               -->
+
+               <dependency>
+                       <groupId>org.apache.logging.log4j</groupId>
+                       <artifactId>log4j-api</artifactId>
+                       <version>2.9.1</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.logging.log4j</groupId>
+                       <artifactId>log4j-core</artifactId>
+                       <version>2.9.1</version>
+                       <scope>test</scope>
+               </dependency>
+
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <!--
+                               For the tests, we need to exclude the Log4j2 to 
slf4j adapter dependency
+                               and let Elasticsearch directly use Log4j2, 
otherwise the embedded Elasticsearch node
+                               used in tests will fail to work.
+
+                               In other words, the connector jar is routing 
Elasticsearch 5.x's Log4j2 API's to SLF4J,
+                               but for the test builds, we still stick to 
directly using Log4j2.
+                       -->
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-surefire-plugin</artifactId>
+                               <version>2.12.2</version>
+                               <configuration>
+                                       <classpathDependencyExcludes>
+                                               
<classpathDependencyExclude>org.apache.logging.log4j:log4j-to-slf4j</classpathDependencyExclude>
+                                       </classpathDependencyExcludes>
+                               </configuration>
+                       </plugin>
+               </plugins>
+       </build>
+
+</project>
diff --git 
a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java
 
b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java
new file mode 100644
index 00000000000..03bf9c07109
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.bulk.BackoffPolicy;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.common.unit.TimeValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 6 
and later versions.
+ */
+@Internal
+public class Elasticsearch6ApiCallBridge implements 
ElasticsearchApiCallBridge<RestHighLevelClient> {
+
+       private static final long serialVersionUID = -5222683870097809633L;
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(Elasticsearch6ApiCallBridge.class);
+
+       /**
+        * User-provided HTTP Host.
+        */
+       private final List<HttpHost> httpHosts;
+
+       /**
+        * The factory to configure the rest client.
+        */
+       private final RestClientFactory restClientFactory;
+
+       Elasticsearch6ApiCallBridge(List<HttpHost> httpHosts, RestClientFactory 
restClientFactory) {
+               Preconditions.checkArgument(httpHosts != null && 
!httpHosts.isEmpty());
+               this.httpHosts = httpHosts;
+               this.restClientFactory = 
Preconditions.checkNotNull(restClientFactory);
+       }
+
+       @Override
+       public RestHighLevelClient createClient(Map<String, String> 
clientConfig) throws IOException {
+               RestClientBuilder builder = 
RestClient.builder(httpHosts.toArray(new HttpHost[httpHosts.size()]));
+               restClientFactory.configureRestClientBuilder(builder);
+
+               RestHighLevelClient rhlClient = new 
RestHighLevelClient(builder);
+
+               if (LOG.isInfoEnabled()) {
+                       LOG.info("Pinging Elasticsearch cluster via hosts {} 
...", httpHosts);
+               }
+
+               if (!rhlClient.ping()) {
+                       throw new RuntimeException("There are no reachable 
Elasticsearch nodes!");
+               }
+
+               if (LOG.isInfoEnabled()) {
+                       LOG.info("Created Elasticsearch RestHighLevelClient 
connected to {}", httpHosts.toString());
+               }
+
+               return rhlClient;
+       }
+
+       @Override
+       public BulkProcessor.Builder 
createBulkProcessorBuilder(RestHighLevelClient client, BulkProcessor.Listener 
listener) {
+               return BulkProcessor.builder(client::bulkAsync, listener);
+       }
+
+       @Override
+       public Throwable 
extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
+               if (!bulkItemResponse.isFailed()) {
+                       return null;
+               } else {
+                       return bulkItemResponse.getFailure().getCause();
+               }
+       }
+
+       @Override
+       public void configureBulkProcessorBackoff(
+               BulkProcessor.Builder builder,
+               @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy 
flushBackoffPolicy) {
+
+               BackoffPolicy backoffPolicy;
+               if (flushBackoffPolicy != null) {
+                       switch (flushBackoffPolicy.getBackoffType()) {
+                               case CONSTANT:
+                                       backoffPolicy = 
BackoffPolicy.constantBackoff(
+                                               new 
TimeValue(flushBackoffPolicy.getDelayMillis()),
+                                               
flushBackoffPolicy.getMaxRetryCount());
+                                       break;
+                               case EXPONENTIAL:
+                               default:
+                                       backoffPolicy = 
BackoffPolicy.exponentialBackoff(
+                                               new 
TimeValue(flushBackoffPolicy.getDelayMillis()),
+                                               
flushBackoffPolicy.getMaxRetryCount());
+                       }
+               } else {
+                       backoffPolicy = BackoffPolicy.noBackoff();
+               }
+
+               builder.setBackoffPolicy(backoffPolicy);
+       }
+}
diff --git 
a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
 
b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
new file mode 100644
index 00000000000..4e7a2635738
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import org.apache.flink.annotation.PublicEvolving;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.client.RestHighLevelClient;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Elasticsearch 6.x sink that requests multiple {@link ActionRequest 
ActionRequests}
+ * against a cluster for each incoming element.
+ *
+ * <p>The sink internally uses a {@link RestHighLevelClient} to communicate 
with an Elasticsearch cluster.
+ * The sink will fail if no cluster can be connected to using the provided 
transport addresses passed to the constructor.
+ *
+ * <p>Internally, the sink will use a {@link BulkProcessor} to send {@link 
ActionRequest ActionRequests}.
+ * This will buffer elements before sending a request to the cluster. The 
behaviour of the
+ * {@code BulkProcessor} can be configured using these config keys:
+ * <ul>
+ *   <li> {@code bulk.flush.max.actions}: Maximum amount of elements to buffer
+ *   <li> {@code bulk.flush.max.size.mb}: Maximum amount of data (in 
megabytes) to buffer
+ *   <li> {@code bulk.flush.interval.ms}: Interval at which to flush data 
regardless of the other two
+ *   settings in milliseconds
+ * </ul>
+ *
+ * <p>You also have to provide an {@link ElasticsearchSinkFunction}. This is 
used to create multiple
+ * {@link ActionRequest ActionRequests} for each incoming element. See the 
class level documentation of
+ * {@link ElasticsearchSinkFunction} for an example.
+ *
+ * @param <T> Type of the elements handled by this sink
+ */
+@PublicEvolving
+public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, 
RestHighLevelClient> {
+
+       private static final long serialVersionUID = 1L;
+
+       private ElasticsearchSink(
+               Map<String, String> bulkRequestsConfig,
+               List<HttpHost> httpHosts,
+               ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
+               ActionRequestFailureHandler failureHandler,
+               RestClientFactory restClientFactory) {
+
+               super(new Elasticsearch6ApiCallBridge(httpHosts, 
restClientFactory),  bulkRequestsConfig, elasticsearchSinkFunction, 
failureHandler);
+       }
+
+       /**
+        * A builder for creating an {@link ElasticsearchSink}.
+        *
+        * @param <T> Type of the elements handled by the sink this builder 
creates.
+        */
+       @PublicEvolving
+       public static class Builder<T> {
+
+               private final List<HttpHost> httpHosts;
+               private final ElasticsearchSinkFunction<T> 
elasticsearchSinkFunction;
+
+               private Map<String, String> bulkRequestsConfig = new 
HashMap<>();
+               private ActionRequestFailureHandler failureHandler = new 
NoOpFailureHandler();
+               private RestClientFactory restClientFactory = restClientBuilder 
-> {};
+
+               /**
+                * Creates a new {@code ElasticsearchSink} that connects to the 
cluster using a {@link RestHighLevelClient}.
+                *
+                * @param httpHosts The list of {@link HttpHost} to which the 
{@link RestHighLevelClient} connects to.
+                * @param elasticsearchSinkFunction This is used to generate 
multiple {@link ActionRequest} from the incoming element.
+                */
+               public Builder(List<HttpHost> httpHosts, 
ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+                       this.httpHosts = Preconditions.checkNotNull(httpHosts);
+                       this.elasticsearchSinkFunction = 
Preconditions.checkNotNull(elasticsearchSinkFunction);
+               }
+
+               /**
+                * Sets the maximum number of actions to buffer for each bulk 
request.
+                *
+                * @param numMaxActions the maxinum number of actions to buffer 
per bulk request.
+                */
+               public void setBulkFlushMaxActions(int numMaxActions) {
+                       Preconditions.checkArgument(
+                               numMaxActions > 0,
+                               "Max number of buffered actions must be larger 
than 0.");
+
+                       
this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, 
String.valueOf(numMaxActions));
+               }
+
+               /**
+                * Sets the maximum size of buffered actions, in mb, per bulk 
request.
+                *
+                * @param maxSizeMb the maximum size of buffered actions, in mb.
+                */
+               public void setBulkFlushMaxSizeMb(int maxSizeMb) {
+                       Preconditions.checkArgument(
+                               maxSizeMb > 0,
+                               "Max size of buffered actions must be larger 
than 0.");
+
+                       
this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB, 
String.valueOf(maxSizeMb));
+               }
+
+               /**
+                * Sets the bulk flush interval, in milliseconds.
+                *
+                * @param intervalMillis the bulk flush interval, in 
milliseconds.
+                */
+               public void setBulkFlushInterval(long intervalMillis) {
+                       Preconditions.checkArgument(
+                               intervalMillis >= 0,
+                               "Interval (in milliseconds) between each flush 
must be larger than or equal to 0.");
+
+                       
this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, 
String.valueOf(intervalMillis));
+               }
+
+               /**
+                * Sets whether or not to enable bulk flush backoff behaviour.
+                *
+                * @param enabled whether or not to enable backoffs.
+                */
+               public void setBulkFlushBackoff(boolean enabled) {
+                       
this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE, 
String.valueOf(enabled));
+               }
+
+               /**
+                * Sets the type of back of to use when flushing bulk requests.
+                *
+                * @param flushBackoffType the backoff type to use.
+                */
+               public void setBulkFlushBackoffType(FlushBackoffType 
flushBackoffType) {
+                       this.bulkRequestsConfig.put(
+                               CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE,
+                               
Preconditions.checkNotNull(flushBackoffType).toString());
+               }
+
+               /**
+                * Sets the maximum number of retries for a backoff attempt 
when flushing bulk requests.
+                *
+                * @param maxRetries the maximum number of retries for a 
backoff attempt when flushing bulk requests
+                */
+               public void setBulkFlushBackoffRetries(int maxRetries) {
+                       Preconditions.checkArgument(
+                               maxRetries > 0,
+                               "Max number of backoff attempts must be larger 
than 0.");
+
+                       
this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES, 
String.valueOf(maxRetries));
+               }
+
+               /**
+                * Sets the amount of delay between each backoff attempt when 
flushing bulk requests, in milliseconds.
+                *
+                * @param delayMillis the amount of delay between each backoff 
attempt when flushing bulk requests, in milliseconds.
+                */
+               public void setBulkFlushBackoffDelay(long delayMillis) {
+                       Preconditions.checkArgument(
+                               delayMillis >= 0,
+                               "Delay (in milliseconds) between each backoff 
attempt must be larger than or equal to 0.");
+                       
this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY, 
String.valueOf(delayMillis));
+               }
+
+               /**
+                * Sets a failure handler for action requests.
+                *
+                * @param failureHandler This is used to handle failed {@link 
ActionRequest}.
+                */
+               public void setFailureHandler(ActionRequestFailureHandler 
failureHandler) {
+                       this.failureHandler = 
Preconditions.checkNotNull(failureHandler);
+               }
+
+               /**
+                * Sets a REST client factory for custom client configuration.
+                *
+                * @param restClientFactory the factory that configures the 
rest client.
+                */
+               public void setRestClientFactory(RestClientFactory 
restClientFactory) {
+                       this.restClientFactory = 
Preconditions.checkNotNull(restClientFactory);
+               }
+
+               /**
+                * Creates the Elasticsearch sink.
+                *
+                * @return the created Elasticsearch sink.
+                */
+               public ElasticsearchSink<T> build() {
+                       return new ElasticsearchSink<>(bulkRequestsConfig, 
httpHosts, elasticsearchSinkFunction, failureHandler, restClientFactory);
+               }
+       }
+}
diff --git 
a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/RestClientFactory.java
 
b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/RestClientFactory.java
new file mode 100644
index 00000000000..4b74649ca87
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/RestClientFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.elasticsearch.client.RestClientBuilder;
+
+import java.io.Serializable;
+
+/**
+ * A factory that is used to configure the {@link 
org.elasticsearch.client.RestHighLevelClient} internally
+ * used in the {@link ElasticsearchSink}.
+ */
+@PublicEvolving
+public interface RestClientFactory extends Serializable {
+
+       /**
+        * Configures the rest client builder.
+        *
+        * @param restClientBuilder the configured rest client builder.
+        */
+       void configureRestClientBuilder(RestClientBuilder restClientBuilder);
+
+}
diff --git 
a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
 
b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
new file mode 100644
index 00000000000..8dc62168049
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkITCase;
+
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.node.InternalSettingsPreparer;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.transport.Netty4Plugin;
+
+import java.io.File;
+import java.util.Collections;
+
+/**
+ * Implementation of {@link EmbeddedElasticsearchNodeEnvironment} for 
Elasticsearch 6.
+ * Will be dynamically loaded in {@link ElasticsearchSinkITCase} for 
integration tests.
+ */
+public class EmbeddedElasticsearchNodeEnvironmentImpl implements 
EmbeddedElasticsearchNodeEnvironment {
+
+       private Node node;
+
+       @Override
+       public void start(File tmpDataFolder, String clusterName) throws 
Exception {
+               if (node == null) {
+                       Settings settings = Settings.builder()
+                               .put("cluster.name", clusterName)
+                               .put("http.enabled", true)
+                               .put("path.home", tmpDataFolder.getParent())
+                               .put("path.data", 
tmpDataFolder.getAbsolutePath())
+                               .build();
+
+                       node = new PluginNode(settings);
+                       node.start();
+               }
+       }
+
+       @Override
+       public void close() throws Exception {
+               if (node != null && !node.isClosed()) {
+                       node.close();
+                       node = null;
+               }
+       }
+
+       @Override
+       public Client getClient() {
+               if (node != null && !node.isClosed()) {
+                       return node.client();
+               } else {
+                       return null;
+               }
+       }
+
+       private static class PluginNode extends Node {
+               public PluginNode(Settings settings) {
+                       
super(InternalSettingsPreparer.prepareEnvironment(settings, null), 
Collections.<Class<? extends Plugin>>singletonList(Netty4Plugin.class));
+               }
+       }
+
+}
diff --git 
a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java
 
b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java
new file mode 100644
index 00000000000..a6f01258940
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * IT cases for the {@link ElasticsearchSink}.
+ *
+ * <p>The Elasticsearch ITCases for 6.x CANNOT be executed in the IDE 
directly, since it is required that the
+ * Log4J-to-SLF4J adapter dependency must be excluded from the test classpath 
for the Elasticsearch embedded
+ * node used in the tests to work properly.
+ */
+public class ElasticsearchSinkITCase extends 
ElasticsearchSinkTestBase<RestHighLevelClient, HttpHost> {
+
+       @Test
+       public void testElasticsearchSink() throws Exception {
+               runElasticsearchSinkTest();
+       }
+
+       @Test
+       public void testNullAddresses() throws Exception {
+               runNullAddressesTest();
+       }
+
+       @Test
+       public void testEmptyAddresses() throws Exception {
+               runEmptyAddressesTest();
+       }
+
+       @Test
+       public void testInvalidElasticsearchCluster() throws Exception{
+               runInvalidElasticsearchClusterTest();
+       }
+
+       @Override
+       protected ElasticsearchSinkBase<Tuple2<Integer, String>, 
RestHighLevelClient> createElasticsearchSink(
+                       int bulkFlushMaxActions,
+                       String clusterName,
+                       List<HttpHost> httpHosts,
+                       ElasticsearchSinkFunction<Tuple2<Integer, String>> 
elasticsearchSinkFunction) {
+
+               ElasticsearchSink.Builder<Tuple2<Integer, String>> builder = 
new ElasticsearchSink.Builder<>(httpHosts, elasticsearchSinkFunction);
+               builder.setBulkFlushMaxActions(bulkFlushMaxActions);
+
+               return builder.build();
+       }
+
+       @Override
+       protected ElasticsearchSinkBase<Tuple2<Integer, String>, 
RestHighLevelClient> createElasticsearchSinkForEmbeddedNode(
+                       int bulkFlushMaxActions,
+                       String clusterName,
+                       ElasticsearchSinkFunction<Tuple2<Integer, String>> 
elasticsearchSinkFunction) throws Exception {
+
+               return createElasticsearchSinkForNode(
+                               bulkFlushMaxActions, clusterName, 
elasticsearchSinkFunction, "127.0.0.1");
+       }
+
+       @Override
+       protected ElasticsearchSinkBase<Tuple2<Integer, String>, 
RestHighLevelClient> createElasticsearchSinkForNode(
+                       int bulkFlushMaxActions,
+                       String clusterName,
+                       ElasticsearchSinkFunction<Tuple2<Integer, String>> 
elasticsearchSinkFunction,
+                       String ipAddress) throws Exception {
+
+               ArrayList<HttpHost> httpHosts = new ArrayList<>();
+               httpHosts.add(new HttpHost(ipAddress, 9200, "http"));
+
+               ElasticsearchSink.Builder<Tuple2<Integer, String>> builder = 
new ElasticsearchSink.Builder<>(httpHosts, elasticsearchSinkFunction);
+               builder.setBulkFlushMaxActions(bulkFlushMaxActions);
+
+               return builder.build();
+       }
+}
diff --git 
a/flink-connectors/flink-connector-elasticsearch6/src/test/resources/log4j-test.properties
 
b/flink-connectors/flink-connector-elasticsearch6/src/test/resources/log4j-test.properties
new file mode 100644
index 00000000000..fcd86546668
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch6/src/test/resources/log4j-test.properties
@@ -0,0 +1,24 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target=System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index 3afb779a80e..cacea91578e 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -50,6 +50,7 @@ under the License.
                <module>flink-connector-elasticsearch</module>
                <module>flink-connector-elasticsearch2</module>
                <module>flink-connector-elasticsearch5</module>
+               <module>flink-connector-elasticsearch6</module>
                <module>flink-connector-rabbitmq</module>
                <module>flink-connector-twitter</module>
                <module>flink-connector-nifi</module>
diff --git a/flink-end-to-end-tests/flink-elasticsearch6-test/pom.xml 
b/flink-end-to-end-tests/flink-elasticsearch6-test/pom.xml
new file mode 100644
index 00000000000..d170235c702
--- /dev/null
+++ b/flink-end-to-end-tests/flink-elasticsearch6-test/pom.xml
@@ -0,0 +1,92 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-end-to-end-tests</artifactId>
+               <version>1.7-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-elasticsearch6-test</artifactId>
+       <name>flink-elasticsearch6-test</name>
+       <packaging>jar</packaging>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-shade-plugin</artifactId>
+                               <version>3.0.0</version>
+                               <executions>
+                                       <execution>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>shade</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<finalName>Elasticsearch6SinkExample</finalName>
+                                                       <artifactSet>
+                                                               <excludes>
+                                                                       
<exclude>com.google.code.findbugs:jsr305</exclude>
+                                                               </excludes>
+                                                       </artifactSet>
+                                                       <filters>
+                                                               <filter>
+                                                                       
<artifact>*:*</artifact>
+                                                                       
<excludes>
+                                                                               
<exclude>META-INF/*.SF</exclude>
+                                                                               
<exclude>META-INF/*.DSA</exclude>
+                                                                               
<exclude>META-INF/*.RSA</exclude>
+                                                                       
</excludes>
+                                                               </filter>
+                                                       </filters>
+                                                       <transformers>
+                                                               <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                                                       
<mainClass>org.apache.flink.streaming.tests.Elasticsearch6SinkExample</mainClass>
+                                                               </transformer>
+                                                       </transformers>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
+
+</project>
diff --git 
a/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java
 
b/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java
new file mode 100644
index 00000000000..dedcbb28f08
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * End to end test for Elasticsearch6Sink.
+ */
+public class Elasticsearch6SinkExample {
+
+       public static void main(String[] args) throws Exception {
+
+               final ParameterTool parameterTool = 
ParameterTool.fromArgs(args);
+
+               if (parameterTool.getNumberOfParameters() < 3) {
+                       System.out.println("Missing parameters!\n" +
+                               "Usage: --numRecords <numRecords> --index 
<index> --type <type>");
+                       return;
+               }
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.getConfig().disableSysoutLogging();
+               env.enableCheckpointing(5000);
+
+               DataStream<String> source = env.generateSequence(0, 
parameterTool.getInt("numRecords") - 1)
+                       .map(new MapFunction<Long, String>() {
+                               @Override
+                               public String map(Long value) throws Exception {
+                                       return "message #" + value;
+                               }
+                       });
+
+               List<HttpHost> httpHosts = new ArrayList<>();
+               httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
+
+               ElasticsearchSink.Builder<String> esSinkBuilder = new 
ElasticsearchSink.Builder<>(
+                       httpHosts,
+                       (String element, RuntimeContext ctx, RequestIndexer 
indexer) -> indexer.add(createIndexRequest(element, parameterTool)));
+
+               // this instructs the sink to emit after every element, 
otherwise they would be buffered
+               esSinkBuilder.setBulkFlushMaxActions(1);
+
+               source.addSink(esSinkBuilder.build());
+
+               env.execute("Elasticsearch 6.x end to end sink test example");
+       }
+
+       private static IndexRequest createIndexRequest(String element, 
ParameterTool parameterTool) {
+               Map<String, Object> json = new HashMap<>();
+               json.put("data", element);
+
+               return Requests.indexRequest()
+                       .index(parameterTool.getRequired("index"))
+                       .type(parameterTool.getRequired("type"))
+                       .id(element)
+                       .source(json);
+       }
+}
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 4abf59509df..5752d9afced 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -48,6 +48,7 @@ under the License.
                <module>flink-elasticsearch1-test</module>
                <module>flink-elasticsearch2-test</module>
                <module>flink-elasticsearch5-test</module>
+               <module>flink-elasticsearch6-test</module>
                <module>flink-quickstart-test</module>
                <module>flink-confluent-schema-registry</module>
                <module>flink-stream-state-ttl-test</module>
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh 
b/flink-end-to-end-tests/run-nightly-tests.sh
index dc8424f25ec..431c21ee775 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -96,6 +96,7 @@ run_test "Local recovery and sticky scheduling end-to-end 
test" "$END_TO_END_DIR
 run_test "Elasticsearch (v1.7.1) sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 1 
https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.1.tar.gz";
 run_test "Elasticsearch (v2.3.5) sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 2 
https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz";
 run_test "Elasticsearch (v5.1.2) sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 5 
https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.1.2.tar.gz";
+run_test "Elasticsearch (v6.3.1) sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 6 
https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.1.tar.gz";
 
 run_test "Quickstarts Java nightly end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_quickstarts.sh java"
 run_test "Quickstarts Scala nightly end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_quickstarts.sh scala"
diff --git a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh 
b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
index 7b627fe9de4..fa6c33124be 100644
--- a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
+++ b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
@@ -42,15 +42,22 @@ function setup_elasticsearch {
 }
 
 function verify_elasticsearch_process_exist {
-    local elasticsearchProcess=$(jps | grep Elasticsearch | awk '{print $2}')
-
-    # make sure the elasticsearch node is actually running
-    if [ "$elasticsearchProcess" != "Elasticsearch" ]; then
-      echo "Elasticsearch node is not running."
-      exit 1
-    else
-      echo "Elasticsearch node is running."
-    fi
+    for ((i=1;i<=10;i++)); do
+        local elasticsearchProcess=$(jps | grep Elasticsearch | awk '{print 
$2}')
+
+        echo "Waiting for Elasticsearch node to start ..."
+
+        # make sure the elasticsearch node is actually running
+        if [ "$elasticsearchProcess" != "Elasticsearch" ]; then
+            sleep 1
+        else
+            echo "Elasticsearch node is running."
+            return
+        fi
+    done
+
+    echo "Elasticsearch node did not start properly"
+    exit 1
 }
 
 function verify_result {
diff --git 
a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh 
b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
index 7464409f41e..c8cd2db17c9 100755
--- a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
@@ -32,9 +32,6 @@ start_cluster
 
 function test_cleanup {
   shutdown_elasticsearch_cluster index
-
-  # make sure to run regular cleanup as well
-   cleanup
 }
 
 trap test_cleanup INT
diff --git a/tools/travis_mvn_watchdog.sh b/tools/travis_mvn_watchdog.sh
index c4620c837af..ae5e58c2f76 100755
--- a/tools/travis_mvn_watchdog.sh
+++ b/tools/travis_mvn_watchdog.sh
@@ -88,6 +88,7 @@ flink-connectors/flink-connector-cassandra,\
 flink-connectors/flink-connector-elasticsearch,\
 flink-connectors/flink-connector-elasticsearch2,\
 flink-connectors/flink-connector-elasticsearch5,\
+flink-connectors/flink-connector-elasticsearch6,\
 flink-connectors/flink-connector-elasticsearch-base,\
 flink-connectors/flink-connector-filesystem,\
 flink-connectors/flink-connector-kafka-0.8,\


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to