[FLINK-4988] Elasticsearch 5.x support

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8699b03d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8699b03d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8699b03d

Branch: refs/heads/master
Commit: 8699b03d79a441ca33d9f62b96490d29a0efaf44
Parents: b452c8b
Author: Mike Dias <[email protected]>
Authored: Mon Nov 7 18:09:48 2016 -0200
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Tue Feb 7 22:45:45 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/elasticsearch5.md           | 146 +++++++++++
 docs/dev/connectors/filesystem_sink.md          |   2 +-
 docs/dev/connectors/index.md                    |   1 -
 docs/dev/connectors/nifi.md                     |   2 +-
 docs/dev/connectors/rabbitmq.md                 |   2 +-
 docs/dev/connectors/twitter.md                  |   2 +-
 .../flink-connector-elasticsearch5/pom.xml      |  93 +++++++
 .../elasticsearch5/BulkProcessorIndexer.java    |  35 +++
 .../elasticsearch5/ElasticsearchSink.java       | 259 +++++++++++++++++++
 .../ElasticsearchSinkFunction.java              |  60 +++++
 .../elasticsearch5/RequestIndexer.java          |  25 ++
 .../elasticsearch5/ElasticsearchSinkITCase.java | 200 ++++++++++++++
 .../examples/ElasticsearchExample.java          |  83 ++++++
 .../src/test/resources/log4j2.properties        |  27 ++
 .../src/test/resources/logback-test.xml         |  30 +++
 flink-connectors/pom.xml                        |   3 +-
 16 files changed, 964 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/docs/dev/connectors/elasticsearch5.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/elasticsearch5.md 
b/docs/dev/connectors/elasticsearch5.md
new file mode 100644
index 0000000..2673d86
--- /dev/null
+++ b/docs/dev/connectors/elasticsearch5.md
@@ -0,0 +1,146 @@
+---
+title: "Elasticsearch 5.x Connector"
+nav-title: Elasticsearch 5.x
+nav-parent_id: connectors
+nav-pos: 6
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+This connector provides a Sink that can write to an
+[Elasticsearch 5.x](https://elastic.co/) Index. To use this connector, add the
+following dependency to your project:
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-elasticsearch5{{ site.scala_version_suffix 
}}</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+
+Note that the streaming connectors are currently not part of the binary
+distribution. See
+[here]({{site.baseurl}}/dev/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution)
+for information about how to package the program with the libraries for
+cluster execution.
+
+#### Installing Elasticsearch 5.x
+
+Instructions for setting up an Elasticsearch cluster can be found
+    
[here](https://www.elastic.co/guide/en/elasticsearch/reference/5.x/setup.html).
+Make sure to set and remember a cluster name. This must be set when
+creating a Sink for writing to your cluster
+
+#### Elasticsearch 5.x Sink
+The connector provides a Sink that can send data to an Elasticsearch 5.x Index.
+
+The sink communicates with Elasticsearch via Transport Client
+
+See 
[here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.x/transport-client.html)
+for information about the Transport Client.
+
+The code below shows how to create a sink that uses a `TransportClient` for 
communication:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+File dataDir = ....;
+
+DataStream<String> input = ...;
+
+Map<String, String> esConfig = new HashMap<>();
+esConfig.put("cluster.name", "my-cluster-name");
+
+// This instructs the sink to emit after every element, otherwise they would 
be buffered
+Map<String, String> sinkConfig = new HashMap<>();
+sinkConfig.put("bulk.flush.max.actions", "1");
+
+List<InetSocketAddress> transports = new ArrayList<>();
+transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 
9300));
+transports.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));
+
+input.addSink(new ElasticsearchSink(esConfig, sinkConfig, transports, new 
ElasticsearchSinkFunction<String>() {
+  public IndexRequest createIndexRequest(String element) {
+    Map<String, String> json = new HashMap<>();
+    json.put("data", element);
+
+    return Requests.indexRequest()
+            .index("my-index")
+            .type("my-type")
+            .source(json);
+  }
+
+  @Override
+  public void process(String element, RuntimeContext ctx, RequestIndexer 
indexer) {
+    indexer.add(createIndexRequest(element));
+  }
+}));
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val dataDir = ....;
+
+val input: DataStream[String] = ...
+
+val esConfig = new util.HashMap[String, String]
+esConfig.put("cluster.name", "my-cluster-name")
+
+val sinkConfig = new util.HashMap[String, String]
+sinkConfig.put("bulk.flush.max.actions", "1")
+
+val transports = new ArrayList[String]
+transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300))
+transports.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));
+
+input.addSink(new ElasticsearchSink(esConfig, sinkConfig, transports, new 
ElasticsearchSinkFunction[String] {
+  def createIndexRequest(element: String): IndexRequest = {
+    val json = new util.HashMap[String, AnyRef]
+    json.put("data", element)
+    Requests.indexRequest.index("my-index").`type`("my-type").source(json)
+  }
+
+  override def process(element: String, ctx: RuntimeContext, indexer: 
RequestIndexer) {
+    indexer.add(createIndexRequest(element))
+  }
+}))
+{% endhighlight %}
+</div>
+</div>
+
+The first Map of Strings is used to configure the Transport Client. The 
configuration keys
+are documented in the Elasticsearch documentation
+[here](https://www.elastic.co/guide/en/elasticsearch/reference/5.x/index.html).
+Especially important is the `cluster.name`. parameter that must correspond to
+the name of your cluster.
+
+The second Map of Strings is used to configure a `BulkProcessor` to send 
Action requests to the cluster.
+This will buffer elements and Action Requests before sending to the cluster. 
The behaviour of the
+`BulkProcessor` can be configured using these config keys:
+ * **bulk.flush.max.actions**: Maximum amount of elements to buffer
+ * **bulk.flush.max.size.mb**: Maximum amount of data (in megabytes) to buffer
+ * **bulk.flush.interval.ms**: Interval at which to flush data regardless of 
the other two
+  settings in milliseconds
+
+This now provides a list of Elasticsearch Nodes
+to which the sink should connect via a `TransportClient`.
+
+More information about Elasticsearch can be found [here](https://elastic.co).
+

http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/docs/dev/connectors/filesystem_sink.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/filesystem_sink.md 
b/docs/dev/connectors/filesystem_sink.md
index 0fa8bb1..bcaeb17 100644
--- a/docs/dev/connectors/filesystem_sink.md
+++ b/docs/dev/connectors/filesystem_sink.md
@@ -2,7 +2,7 @@
 title: "HDFS Connector"
 nav-title: Rolling File Sink
 nav-parent_id: connectors
-nav-pos: 6
+nav-pos: 7
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one

http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/docs/dev/connectors/index.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/index.md b/docs/dev/connectors/index.md
index ec0725a..f5c3eec 100644
--- a/docs/dev/connectors/index.md
+++ b/docs/dev/connectors/index.md
@@ -31,7 +31,6 @@ Currently these systems are supported: (Please select the 
respective documentati
 
  * [Apache Kafka](https://kafka.apache.org/) (sink/source)
  * [Elasticsearch](https://elastic.co/) (sink)
- * [Elasticsearch 2x](https://elastic.co/) (sink)
  * [Hadoop FileSystem](http://hadoop.apache.org) (sink)
  * [RabbitMQ](http://www.rabbitmq.com/) (sink/source)
  * [Amazon Kinesis Streams](http://aws.amazon.com/kinesis/streams/) 
(sink/source)

http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/docs/dev/connectors/nifi.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/nifi.md b/docs/dev/connectors/nifi.md
index aa9eba2..8223867 100644
--- a/docs/dev/connectors/nifi.md
+++ b/docs/dev/connectors/nifi.md
@@ -2,7 +2,7 @@
 title: "Apache NiFi Connector"
 nav-title: NiFi
 nav-parent_id: connectors
-nav-pos: 8
+nav-pos: 9
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one

http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/docs/dev/connectors/rabbitmq.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/rabbitmq.md b/docs/dev/connectors/rabbitmq.md
index c94c99d..b4da248 100644
--- a/docs/dev/connectors/rabbitmq.md
+++ b/docs/dev/connectors/rabbitmq.md
@@ -2,7 +2,7 @@
 title: "RabbitMQ Connector"
 nav-title: RabbitMQ
 nav-parent_id: connectors
-nav-pos: 7
+nav-pos: 8
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one

http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/docs/dev/connectors/twitter.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/twitter.md b/docs/dev/connectors/twitter.md
index be15aaf..f1fbbd4 100644
--- a/docs/dev/connectors/twitter.md
+++ b/docs/dev/connectors/twitter.md
@@ -2,7 +2,7 @@
 title: "Twitter Connector"
 nav-title: Twitter
 nav-parent_id: connectors
-nav-pos: 9
+nav-pos: 10
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one

http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/flink-connectors/flink-connector-elasticsearch5/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch5/pom.xml 
b/flink-connectors/flink-connector-elasticsearch5/pom.xml
new file mode 100644
index 0000000..8fc5c8b
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch5/pom.xml
@@ -0,0 +1,93 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-connectors</artifactId>
+               <version>1.2-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-connector-elasticsearch5_2.10</artifactId>
+       <name>flink-connector-elasticsearch5</name>
+
+       <packaging>jar</packaging>
+
+       <!-- Allow users to pass custom connector versions -->
+       <properties>
+               <elasticsearch.version>5.0.0</elasticsearch.version>
+       </properties>
+
+       <dependencies>
+
+               <!-- core dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.elasticsearch.client</groupId>
+                       <artifactId>transport</artifactId>
+                       <version>${elasticsearch.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.logging.log4j</groupId>
+                       <artifactId>log4j-api</artifactId>
+                       <version>2.7</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.logging.log4j</groupId>
+                       <artifactId>log4j-core</artifactId>
+                       <version>2.7</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>com.fasterxml.jackson.core</groupId>
+                       <artifactId>jackson-core</artifactId>
+               </dependency>
+
+               <!-- test dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+       </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/BulkProcessorIndexer.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/BulkProcessorIndexer.java
 
b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/BulkProcessorIndexer.java
new file mode 100644
index 0000000..f7ca499
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/BulkProcessorIndexer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch5;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+
+public class BulkProcessorIndexer implements RequestIndexer {
+       private final BulkProcessor bulkProcessor;
+
+       public BulkProcessorIndexer(BulkProcessor bulkProcessor) {
+               this.bulkProcessor = bulkProcessor;
+       }
+
+       @Override
+       public void add(ActionRequest... actionRequests) {
+               for (ActionRequest actionRequest : actionRequests) {
+                       this.bulkProcessor.add(actionRequest);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
 
b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
new file mode 100644
index 0000000..29c69c4
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch5;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.Preconditions;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.network.NetworkModule;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.transport.Netty3Plugin;
+import org.elasticsearch.transport.client.PreBuiltTransportClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Sink that emits its input elements in bulk to an Elasticsearch cluster.
+ * <p>
+ * <p>
+ * The first {@link Map} passed to the constructor is forwarded to 
Elasticsearch when creating
+ * {@link TransportClient}. The config keys can be found in the Elasticsearch
+ * documentation. An important setting is {@code cluster.name}, this should be 
set to the name
+ * of the cluster that the sink should emit to.
+ * <p>
+ * <b>Attention: </b> When using the {@code TransportClient} the sink will 
fail if no cluster
+ * can be connected to.
+ * <p>
+ * The second {@link Map} is used to configure a {@link BulkProcessor} to send 
{@link IndexRequest IndexRequests}.
+ * This will buffer elements before sending a request to the cluster. The 
behaviour of the
+ * {@code BulkProcessor} can be configured using these config keys:
+ * <ul>
+ * <li> {@code bulk.flush.max.actions}: Maximum amount of elements to buffer
+ * <li> {@code bulk.flush.max.size.mb}: Maximum amount of data (in megabytes) 
to buffer
+ * <li> {@code bulk.flush.interval.ms}: Interval at which to flush data 
regardless of the other two
+ * settings in milliseconds
+ * </ul>
+ * <p>
+ * <p>
+ * You also have to provide an {@link RequestIndexer}. This is used to create 
an
+ * {@link IndexRequest} from an element that needs to be added to 
Elasticsearch. See
+ * {@link RequestIndexer} for an example.
+ *
+ * @param <T> Type of the elements emitted by this sink
+ */
+public class ElasticsearchSink<T> extends RichSinkFunction<T> {
+
+       public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = 
"bulk.flush.max.actions";
+       public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = 
"bulk.flush.max.size.mb";
+       public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = 
"bulk.flush.interval.ms";
+
+       private static final long serialVersionUID = 1L;
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ElasticsearchSink.class);
+
+       /**
+        * The user specified config map that we forward to Elasticsearch when 
we create the Client.
+        */
+       private final Map<String, String> esConfig;
+
+       /**
+        * The user specified config map that we use to configure BulkProcessor.
+        */
+       private final Map<String, String> sinkConfig;
+
+       /**
+        * The list of nodes that the TransportClient should connect to. This 
is null if we are using
+        * an embedded Node to get a Client.
+        */
+       private final List<InetSocketAddress> transportAddresses;
+
+       /**
+        * The builder that is used to construct an {@link IndexRequest} from 
the incoming element.
+        */
+       private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction;
+
+       /**
+        * The Client that was either retrieved from a Node or is a 
TransportClient.
+        */
+       private transient Client client;
+
+       /**
+        * Bulk processor that was created using the client
+        */
+       private transient BulkProcessor bulkProcessor;
+
+       /**
+        * Bulk {@link org.elasticsearch.action.ActionRequest} indexer
+        */
+       private transient RequestIndexer requestIndexer;
+
+       /**
+        * This is set from inside the BulkProcessor listener if there where 
failures in processing.
+        */
+       private final AtomicBoolean hasFailure = new AtomicBoolean(false);
+
+       /**
+        * This is set from inside the BulkProcessor listener if a Throwable 
was thrown during processing.
+        */
+       private final AtomicReference<Throwable> failureThrowable = new 
AtomicReference<>();
+
+       /**
+        * Creates a new ElasticsearchSink that connects to the cluster using a 
TransportClient.
+        *
+        * @param esConfig                  The map of user settings that are 
passed when constructing the TransportClient
+        * @param sinkConfig                The map of user settings that are 
passed when constructing the BulkProcessor
+        * @param transportAddresses        The Elasticsearch Nodes to which to 
connect using a {@code TransportClient}
+        * @param elasticsearchSinkFunction This is used to generate the 
ActionRequest from the incoming element
+        */
+       public ElasticsearchSink(Map<String, String> esConfig, Map<String, 
String> sinkConfig, List<InetSocketAddress> transportAddresses, 
ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+               this.esConfig = esConfig;
+               this.sinkConfig = sinkConfig;
+               this.elasticsearchSinkFunction = elasticsearchSinkFunction;
+               Preconditions.checkArgument(transportAddresses != null && 
transportAddresses.size() > 0);
+               this.transportAddresses = transportAddresses;
+       }
+
+       /**
+        * Initializes the connection to Elasticsearch by creating a
+        * {@link org.elasticsearch.client.transport.TransportClient}.
+        */
+       @Override
+       public void open(Configuration configuration) {
+               List<TransportAddress> transportNodes;
+               transportNodes = new ArrayList<>(transportAddresses.size());
+               for (InetSocketAddress address : transportAddresses) {
+                       transportNodes.add(new 
InetSocketTransportAddress(address));
+               }
+
+               Settings settings = Settings.builder().put(esConfig)
+                       .put(NetworkModule.HTTP_TYPE_KEY, 
Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME)
+                       .put(NetworkModule.TRANSPORT_TYPE_KEY, 
Netty3Plugin.NETTY_TRANSPORT_NAME)
+                       .build();
+
+               TransportClient transportClient = new 
PreBuiltTransportClient(settings);
+               for (TransportAddress transport : transportNodes) {
+                       transportClient.addTransportAddress(transport);
+               }
+
+               // verify that we actually are connected to a cluster
+               if (transportClient.connectedNodes().isEmpty()) {
+                       throw new RuntimeException("Client is not connected to 
any Elasticsearch nodes!");
+               }
+
+               client = transportClient;
+
+               if (LOG.isInfoEnabled()) {
+                       LOG.info("Created Elasticsearch TransportClient {}", 
client);
+               }
+
+               BulkProcessor.Builder bulkProcessorBuilder = 
BulkProcessor.builder(client, new BulkProcessor.Listener() {
+                       @Override
+                       public void beforeBulk(long executionId, BulkRequest 
request) {
+
+                       }
+
+                       @Override
+                       public void afterBulk(long executionId, BulkRequest 
request, BulkResponse response) {
+                               if (response.hasFailures()) {
+                                       for (BulkItemResponse itemResp : 
response.getItems()) {
+                                               if (itemResp.isFailed()) {
+                                                       LOG.error("Failed to 
index document in Elasticsearch: " + itemResp.getFailureMessage());
+                                                       
failureThrowable.compareAndSet(null, new 
RuntimeException(itemResp.getFailureMessage()));
+                                               }
+                                       }
+                                       hasFailure.set(true);
+                               }
+                       }
+
+                       @Override
+                       public void afterBulk(long executionId, BulkRequest 
request, Throwable failure) {
+                               LOG.error(failure.getMessage());
+                               failureThrowable.compareAndSet(null, failure);
+                               hasFailure.set(true);
+                       }
+               });
+
+               // This makes flush() blocking
+               bulkProcessorBuilder.setConcurrentRequests(0);
+
+               ParameterTool params = ParameterTool.fromMap(sinkConfig);
+
+               if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
+                       
bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS));
+               }
+
+               if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
+                       bulkProcessorBuilder.setBulkSize(new 
ByteSizeValue(params.getInt(
+                               CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), 
ByteSizeUnit.MB));
+               }
+
+               if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
+                       
bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)));
+               }
+
+               bulkProcessor = bulkProcessorBuilder.build();
+               requestIndexer = new BulkProcessorIndexer(bulkProcessor);
+       }
+
+       @Override
+       public void invoke(T element) {
+               elasticsearchSinkFunction.process(element, getRuntimeContext(), 
requestIndexer);
+       }
+
+       @Override
+       public void close() {
+               if (bulkProcessor != null) {
+                       bulkProcessor.close();
+                       bulkProcessor = null;
+               }
+
+               if (client != null) {
+                       client.close();
+               }
+
+               if (hasFailure.get()) {
+                       Throwable cause = failureThrowable.get();
+                       if (cause != null) {
+                               throw new RuntimeException("An error occurred 
in ElasticsearchSink.", cause);
+                       } else {
+                               throw new RuntimeException("An error occurred 
in ElasticsearchSink.");
+                       }
+               }
+
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkFunction.java
 
b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkFunction.java
new file mode 100644
index 0000000..752a83e
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkFunction.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch5;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+import java.io.Serializable;
+
+/**
+ * Method that creates an {@link org.elasticsearch.action.ActionRequest} from 
an element in a Stream.
+ *
+ * <p>
+ * This is used by {@link ElasticsearchSink} to prepare elements for sending 
them to Elasticsearch.
+ *
+ * <p>
+ * Example:
+ *
+ * <pre>{@code
+ *                                     private static class 
TestElasticSearchSinkFunction implements
+ *                                             
ElasticsearchSinkFunction<Tuple2<Integer, String>> {
+ *
+ *                                     public IndexRequest 
createIndexRequest(Tuple2<Integer, String> element) {
+ *                                             Map<String, Object> json = new 
HashMap<>();
+ *                                             json.put("data", element.f1);
+ *
+ *                                             return Requests.indexRequest()
+ *                                                     .index("my-index")
+ *                                                     .type("my-type")
+ *                                                     
.id(element.f0.toString())
+ *                                                     .source(json);
+ *                                             }
+ *
+ *                             public void process(Tuple2<Integer, String> 
element, RuntimeContext ctx, RequestIndexer indexer) {
+ *                                     
indexer.add(createIndexRequest(element));
+ *                             }
+ *             }
+ *
+ * }</pre>
+ *
+ * @param <T> The type of the element handled by this {@code 
ElasticsearchSinkFunction}
+ */
+public interface ElasticsearchSinkFunction<T> extends Serializable, Function {
+       void process(T element, RuntimeContext ctx, RequestIndexer indexer);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/RequestIndexer.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/RequestIndexer.java
 
b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/RequestIndexer.java
new file mode 100644
index 0000000..170df31
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/RequestIndexer.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch5;
+
+import org.elasticsearch.action.ActionRequest;
+
+import java.io.Serializable;
+
+public interface RequestIndexer extends Serializable {
+       void add(ActionRequest... actionRequests);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
 
b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
new file mode 100644
index 0000000..b4a370b
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch5;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.common.network.NetworkModule;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.node.internal.InternalSettingsPreparer;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.transport.Netty3Plugin;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class ElasticsearchSinkITCase extends StreamingMultipleProgramsTestBase 
{
+
+       private static final int NUM_ELEMENTS = 20;
+
+       @ClassRule
+       public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Test
+       public void testTransportClient() throws Exception {
+
+               File dataDir = tempFolder.newFolder();
+
+               Settings settings = Settings.builder()
+                       .put("cluster.name", "my-transport-client-cluster")
+                       .put("http.enabled", false)
+                       .put("path.home", dataDir.getParent())
+                       .put("path.data", dataDir.getAbsolutePath())
+                       .put(NetworkModule.HTTP_TYPE_KEY, 
Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME)
+                       .put(NetworkModule.TRANSPORT_TYPE_KEY, 
Netty3Plugin.NETTY_TRANSPORT_NAME)
+                       .build();
+
+               Node node = new PluginNode(settings);
+               node.start();
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStreamSource<Tuple2<Integer, String>> source = 
env.addSource(new TestSourceFunction());
+
+               Map<String, String> esConfig = ImmutableMap.of("cluster.name", 
"my-transport-client-cluster");
+               Map<String, String> sinkConfig = 
ImmutableMap.of(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+               List<InetSocketAddress> transports = new ArrayList<>();
+               transports.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+
+               source.addSink(new ElasticsearchSink<>(esConfig, sinkConfig, 
transports, new TestElasticsearchSinkFunction()));
+
+               env.execute("Elasticsearch TransportClient Test");
+
+               // verify the results
+               Client client = node.client();
+               for (int i = 0; i < NUM_ELEMENTS; i++) {
+                       GetResponse response = client.prepareGet("my-index", 
"my-type", Integer.toString(i)).get();
+                       assertEquals("message #" + i, 
response.getSource().get("data"));
+               }
+
+               node.close();
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testNullTransportClient() throws Exception {
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStreamSource<Tuple2<Integer, String>> source = 
env.addSource(new TestSourceFunction());
+
+               Map<String, String> esConfig = ImmutableMap.of("cluster.name", 
"my-transport-client-cluster");
+               Map<String, String> sinkConfig = 
ImmutableMap.of(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+               source.addSink(new ElasticsearchSink<>(esConfig, sinkConfig, 
null, new TestElasticsearchSinkFunction()));
+
+               fail();
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testEmptyTransportClient() throws Exception {
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStreamSource<Tuple2<Integer, String>> source = 
env.addSource(new TestSourceFunction());
+
+               Map<String, String> esConfig = ImmutableMap.of("cluster.name", 
"my-transport-client-cluster");
+               Map<String, String> sinkConfig = 
ImmutableMap.of(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+               source.addSink(new ElasticsearchSink<>(esConfig, sinkConfig, 
new ArrayList<InetSocketAddress>(), new TestElasticsearchSinkFunction()));
+
+               env.execute("Elasticsearch TransportClient Test");
+
+               fail();
+       }
+
+       @Test(expected = JobExecutionException.class)
+       public void testTransportClientFails() throws Exception {
+               // this checks whether the TransportClient fails early when 
there is no cluster to
+               // connect to. There isn't a similar test for the Node Client 
version since that
+               // one will block and wait for a cluster to come online
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStreamSource<Tuple2<Integer, String>> source = 
env.addSource(new TestSourceFunction());
+
+               Map<String, String> esConfig = ImmutableMap.of("cluster.name", 
"my-transport-client-cluster");
+               Map<String, String> sinkConfig = 
ImmutableMap.of(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+               List<InetSocketAddress> transports = new ArrayList<>();
+               transports.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+
+               source.addSink(new ElasticsearchSink<>(esConfig, sinkConfig, 
transports, new TestElasticsearchSinkFunction()));
+
+               env.execute("Elasticsearch Node Client Test");
+
+               fail();
+       }
+
+       private static class TestSourceFunction implements 
SourceFunction<Tuple2<Integer, String>> {
+               private static final long serialVersionUID = 1L;
+
+               private volatile boolean running = true;
+
+               @Override
+               public void run(SourceContext<Tuple2<Integer, String>> ctx) 
throws Exception {
+                       for (int i = 0; i < NUM_ELEMENTS && running; i++) {
+                               ctx.collect(Tuple2.of(i, "message #" + i));
+                       }
+               }
+
+               @Override
+               public void cancel() {
+                       running = false;
+               }
+       }
+
+       private static class TestElasticsearchSinkFunction implements 
ElasticsearchSinkFunction<Tuple2<Integer, String>> {
+               private static final long serialVersionUID = 1L;
+
+               public IndexRequest createIndexRequest(Tuple2<Integer, String> 
element) {
+                       Map<String, Object> json = new HashMap<>();
+                       json.put("data", element.f1);
+
+                       return Requests.indexRequest()
+                               .index("my-index")
+                               .type("my-type")
+                               .id(element.f0.toString())
+                               .source(json);
+               }
+
+               @Override
+               public void process(Tuple2<Integer, String> element, 
RuntimeContext ctx, RequestIndexer indexer) {
+                       indexer.add(createIndexRequest(element));
+               }
+       }
+
+       private static class PluginNode extends Node {
+               public PluginNode(Settings settings) {
+                       
super(InternalSettingsPreparer.prepareEnvironment(settings, null), 
Collections.<Class<? extends Plugin>>singletonList(Netty3Plugin.class));
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchExample.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchExample.java
 
b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchExample.java
new file mode 100644
index 0000000..47ce846
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchExample.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch5.examples;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;
+import 
org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch5.RequestIndexer;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This example shows how to use the Elasticsearch Sink. Before running it you 
must ensure that
+ * you have a cluster named "elasticsearch" running or change the name of 
cluster in the config map.
+ */
+public class ElasticsearchExample {
+
+       public static void main(String[] args) throws Exception {
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               SingleOutputStreamOperator<String> source =
+                       env.generateSequence(0, 20).map(new MapFunction<Long, 
String>() {
+                               @Override
+                               public String map(Long value) throws Exception {
+                                       return "message #" + value;
+                               }
+                       });
+
+               Map<String, String> esConfig = ImmutableMap.of("cluster.name", 
"elasticsearch");
+
+               // This instructs the sink to emit after every element, 
otherwise they would be buffered
+               Map<String, String> sinkConfig = 
ImmutableMap.of(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+               List<InetSocketAddress> transports = new ArrayList<>();
+               transports.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+
+               source.addSink(new ElasticsearchSink<>(esConfig, sinkConfig, 
transports, new ElasticsearchSinkFunction<String>() {
+                       @Override
+                       public void process(String element, RuntimeContext ctx, 
RequestIndexer indexer) {
+                               indexer.add(createIndexRequest(element));
+                       }
+               }));
+
+               env.execute("Elasticsearch Example");
+       }
+
+       private static IndexRequest createIndexRequest(String element) {
+               Map<String, Object> json = new HashMap<>();
+               json.put("data", element);
+
+               return Requests.indexRequest()
+                       .index("my-index")
+                       .type("my-type")
+                       .id(element)
+                       .source(json);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/flink-connectors/flink-connector-elasticsearch5/src/test/resources/log4j2.properties
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch5/src/test/resources/log4j2.properties
 
b/flink-connectors/flink-connector-elasticsearch5/src/test/resources/log4j2.properties
new file mode 100644
index 0000000..dc20726
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch5/src/test/resources/log4j2.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=OFF, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/flink-connectors/flink-connector-elasticsearch5/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch5/src/test/resources/logback-test.xml
 
b/flink-connectors/flink-connector-elasticsearch5/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..8893f7c
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch5/src/test/resources/logback-test.xml
@@ -0,0 +1,30 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} 
%X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+    <logger name="org.apache.flink.streaming.connectors.elasticsearch5" 
level="WARN"/>
+</configuration>

http://git-wip-us.apache.org/repos/asf/flink/blob/8699b03d/flink-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index 91ee6af..e19c77f 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -47,6 +47,7 @@ under the License.
                <module>flink-connector-kafka-0.10</module>
                <module>flink-connector-elasticsearch</module>
                <module>flink-connector-elasticsearch2</module>
+               <module>flink-connector-elasticsearch5</module>
                <module>flink-connector-rabbitmq</module>
                <module>flink-connector-twitter</module>
                <module>flink-connector-nifi</module>
@@ -86,5 +87,5 @@ under the License.
                        </modules>
                </profile>
        </profiles>
-       
+
 </project>

Reply via email to