Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3112#discussion_r95968855
  
    --- Diff: docs/dev/connectors/elasticsearch.md ---
    @@ -23,158 +23,284 @@ specific language governing permissions and 
limitations
     under the License.
     -->
     
    -This connector provides a Sink that can write to an
    -[Elasticsearch](https://elastic.co/) Index. To use this connector, add the
    -following dependency to your project:
    -
    -{% highlight xml %}
    -<dependency>
    -  <groupId>org.apache.flink</groupId>
    -  <artifactId>flink-connector-elasticsearch{{ site.scala_version_suffix 
}}</artifactId>
    -  <version>{{site.version }}</version>
    -</dependency>
    -{% endhighlight %}
    +This connector provides sinks that can request document actions to an
    +[Elasticsearch](https://elastic.co/) Index. To use this connector, add one
    +of the following dependencies to your project, depending on the version
    +of the Elasticsearch installation:
    +
    +<table class="table table-bordered">
    +  <thead>
    +    <tr>
    +      <th class="text-left">Maven Dependency</th>
    +      <th class="text-left">Supported since</th>
    +      <th class="text-left">Elasticsearch version</th>
    +    </tr>
    +  </thead>
    +  <tbody>
    +    <tr>
    +        <td>flink-connector-elasticsearch{{ site.scala_version_suffix 
}}</td>
    +        <td>1.0.0</td>
    +        <td>1.x</td>
    +    </tr>
    +    <tr>
    +        <td>flink-connector-elasticsearch2{{ site.scala_version_suffix 
}}</td>
    +        <td>1.0.0</td>
    +        <td>2.x</td>
    +    </tr>
    +    <tr>
    +        <td>flink-connector-elasticsearch5{{ site.scala_version_suffix 
}}</td>
    +        <td>1.2.0</td>
    +        <td>5.x</td>
    +    </tr>
    +  </tbody>
    +</table>
     
     Note that the streaming connectors are currently not part of the binary
    -distribution. See
    -[here]({{site.baseurl}}/dev/linking)
    -for information about how to package the program with the libraries for
    -cluster execution.
    +distribution. See [here]({{site.baseurl}}/dev/linking) for information
    +about how to package the program with the libraries for cluster execution.
     
     #### Installing Elasticsearch
     
     Instructions for setting up an Elasticsearch cluster can be found
     
[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
     Make sure to set and remember a cluster name. This must be set when
    -creating a Sink for writing to your cluster
    +creating an `ElasticsearchSink` for requesting document actions against 
your cluster.
     
     #### Elasticsearch Sink
    -The connector provides a Sink that can send data to an Elasticsearch Index.
    -
    -The sink can use two different methods for communicating with 
Elasticsearch:
    -
    -1. An embedded Node
    -2. The TransportClient
     
    -See 
[here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html)
    -for information about the differences between the two modes.
    +The `ElasticsearchSink` uses a `TransportClient` to communicate with an
    +Elasticsearch cluster.
     
    -This code shows how to create a sink that uses an embedded Node for
    -communication:
    +The example below shows how to configure and create a sink:
     
     <div class="codetabs" markdown="1">
    -<div data-lang="java" markdown="1">
    +<div data-lang="java, Elasticsearch 1.x" markdown="1">
     {% highlight java %}
     DataStream<String> input = ...;
     
    -Map<String, String> config = Maps.newHashMap();
    +Map<String, String> config = new HashMap<>();
    +config.put("cluster.name", "my-cluster-name")
     // This instructs the sink to emit after every element, otherwise they 
would be buffered
     config.put("bulk.flush.max.actions", "1");
    -config.put("cluster.name", "my-cluster-name");
     
    -input.addSink(new ElasticsearchSink<>(config, new 
IndexRequestBuilder<String>() {
    -    @Override
    -    public IndexRequest createIndexRequest(String element, RuntimeContext 
ctx) {
    -        Map<String, Object> json = new HashMap<>();
    -        json.put("data", element);
    +List<TransportAddress> transportAddresses = new ArrayList<String>();
    +transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300));
    +transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300));
     
    +input.addSink(new ElasticsearchSink<>(config, transportAddresses, new 
ElasticsearchSinkFunction<String>() {
    +    public IndexRequest createIndexRequest(String element) {
    +        Map<String, String> json = new HashMap<>();
    +        json.put("data", element);
    +    
             return Requests.indexRequest()
                     .index("my-index")
                     .type("my-type")
                     .source(json);
         }
    +    
    +    @Override
    +    public void process(String element, RuntimeContext ctx, RequestIndexer 
indexer) {
    +        indexer.add(createIndexRequest(element));
    +    }
     }));
     {% endhighlight %}
     </div>
    -<div data-lang="scala" markdown="1">
    +<div data-lang="java, Elasticsearch 2.x / 5.x" markdown="1">
    +{% highlight java %}
    +DataStream<String> input = ...;
    +
    +Map<String, String> config = new HashMap<>();
    +config.put("cluster.name", "my-cluster-name")
    +// This instructs the sink to emit after every element, otherwise they 
would be buffered
    +config.put("bulk.flush.max.actions", "1");
    +
    +List<InetSocketAddress> transportAddresses = new ArrayList<>();
    +transportAddresses.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
    +transportAddresses.add(new 
InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));
    +
    +input.addSink(new ElasticsearchSink<>(config, transportAddresses, new 
ElasticsearchSinkFunction<String>() {
    +    public IndexRequest createIndexRequest(String element) {
    +        Map<String, String> json = new HashMap<>();
    +        json.put("data", element);
    +    
    +        return Requests.indexRequest()
    +                .index("my-index")
    +                .type("my-type")
    +                .source(json);
    +    }
    +    
    +    @Override
    +    public void process(String element, RuntimeContext ctx, RequestIndexer 
indexer) {
    +        indexer.add(createIndexRequest(element));
    +    }
    +}));{% endhighlight %}
    +</div>
    +<div data-lang="scala, Elasticsearch 1.x" markdown="1">
     {% highlight scala %}
     val input: DataStream[String] = ...
     
    -val config = new util.HashMap[String, String]
    +val config = new java.util.HashMap[String, String]
    +config.put("cluster.name", "my-cluster-name")
    +// This instructs the sink to emit after every element, otherwise they 
would be buffered
     config.put("bulk.flush.max.actions", "1")
    +
    +val transportAddresses = new java.util.ArrayList[TransportAddress]
    +transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300))
    +transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300))
    +
    +input.addSink(new ElasticsearchSink(config, transportAddresses, new 
ElasticsearchSinkFunction[String] {
    +  def createIndexRequest(element: String): IndexRequest = {
    +    val json = new java.util.HashMap[String, String]
    +    json.put("data", element)
    +    
    +    return Requests.indexRequest()
    +            .index("my-index")
    +            .type("my-type")
    +            .source(json);
    +  }
    +}))
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala, Elasticsearch 2.x / 5.x" markdown="1">
    +{% highlight scala %}
    +val input: DataStream[String] = ...
    +
    +val config = new java.util.HashMap[String, String]
     config.put("cluster.name", "my-cluster-name")
    +// This instructs the sink to emit after every element, otherwise they 
would be buffered
    +config.put("bulk.flush.max.actions", "1")
    +
    +val transportAddresses = new java.util.ArrayList[InetSocketAddress]
    +transportAddresses.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300))
    +transportAddresses.add(new 
InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300))
     
    -text.addSink(new ElasticsearchSink(config, new IndexRequestBuilder[String] 
{
    -  override def createIndexRequest(element: String, ctx: RuntimeContext): 
IndexRequest = {
    -    val json = new util.HashMap[String, AnyRef]
    +input.addSink(new ElasticsearchSink(config, transportAddresses, new 
ElasticsearchSinkFunction[String] {
    +  def createIndexRequest(element: String): IndexRequest = {
    +    val json = new java.util.HashMap[String, String]
         json.put("data", element)
    -    println("SENDING: " + element)
    -    Requests.indexRequest.index("my-index").`type`("my-type").source(json)
    +    
    +    return Requests.indexRequest()
    +            .index("my-index")
    +            .type("my-type")
    +            .source(json);
       }
     }))
     {% endhighlight %}
     </div>
     </div>
     
    -Note how a Map of Strings is used to configure the Sink. The configuration 
keys
    -are documented in the Elasticsearch documentation
    +Note how a `Map` of `String`s is used to configure the `ElasticsearchSink`.
    +The configuration keys are documented in the Elasticsearch documentation
     
[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html).
     Especially important is the `cluster.name` parameter that must correspond 
to
     the name of your cluster.
     
    -Internally, the sink uses a `BulkProcessor` to send index requests to the 
cluster.
    -This will buffer elements before sending a request to the cluster. The 
behaviour of the
    -`BulkProcessor` can be configured using these config keys:
    +Also note that the example only demonstrates performing a single index
    +request for each incoming element. Generally, the 
`ElasticsearchSinkFunction`
    +can be used to perform multiple requests of different types (ex.,
    +`DeleteRequest`, `UpdateRequest`, etc.). 
    +
    +Internally, the sink uses a `BulkProcessor` to send acttion requests to 
the cluster.
    +This will buffer elements before sending them in bulk to the cluster. The 
behaviour of the
    +`BulkProcessor` can be set using these config keys in the provided `Map` 
configuration:
      * **bulk.flush.max.actions**: Maximum amount of elements to buffer
      * **bulk.flush.max.size.mb**: Maximum amount of data (in megabytes) to 
buffer
      * **bulk.flush.interval.ms**: Interval at which to flush data regardless 
of the other two
       settings in milliseconds
     
    -This example code does the same, but with a `TransportClient`:
    +#### Communication using Embedded Node (only for Elasticsearch 1.x)
    +
    +For Elasticsearch versions 1.x, communication using an embedded node is
    +also supported. See 
[here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html)
    +for information about the differences between communicating with 
Elasticsearch
    +with an embedded node and a `TransportClient`.
    +
    +Below is an example of how to create an `ElasticsearchSink` use an
    +embedded node instead of a `TransportClient`:
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
     DataStream<String> input = ...;
     
    -Map<String, String> config = Maps.newHashMap();
    +Map<String, String> config = new HashMap<>;
     // This instructs the sink to emit after every element, otherwise they 
would be buffered
     config.put("bulk.flush.max.actions", "1");
     config.put("cluster.name", "my-cluster-name");
     
    -List<TransportAddress> transports = new ArrayList<String>();
    -transports.add(new InetSocketTransportAddress("node-1", 9300));
    -transports.add(new InetSocketTransportAddress("node-2", 9300));
    -
    -input.addSink(new ElasticsearchSink<>(config, transports, new 
IndexRequestBuilder<String>() {
    -    @Override
    -    public IndexRequest createIndexRequest(String element, RuntimeContext 
ctx) {
    -        Map<String, Object> json = new HashMap<>();
    +input.addSink(new ElasticsearchSink<>(config, new 
ElasticsearchSinkFunction<String>() {
    +    public IndexRequest createIndexRequest(String element) {
    +        Map<String, String> json = new HashMap<>();
             json.put("data", element);
    -
    +    
             return Requests.indexRequest()
                     .index("my-index")
                     .type("my-type")
                     .source(json);
         }
    +    
    +    @Override
    +    public void process(String element, RuntimeContext ctx, RequestIndexer 
indexer) {
    +        indexer.add(createIndexRequest(element));
    +    }
     }));
     {% endhighlight %}
     </div>
     <div data-lang="scala" markdown="1">
     {% highlight scala %}
     val input: DataStream[String] = ...
     
    -val config = new util.HashMap[String, String]
    +val config = new java.util.HashMap[String, String]
     config.put("bulk.flush.max.actions", "1")
     config.put("cluster.name", "my-cluster-name")
     
    -val transports = new ArrayList[String]
    -transports.add(new InetSocketTransportAddress("node-1", 9300))
    -transports.add(new InetSocketTransportAddress("node-2", 9300))
    -
    -text.addSink(new ElasticsearchSink(config, transports, new 
IndexRequestBuilder[String] {
    -  override def createIndexRequest(element: String, ctx: RuntimeContext): 
IndexRequest = {
    -    val json = new util.HashMap[String, AnyRef]
    +input.addSink(new ElasticsearchSink(config, new 
ElasticsearchSinkFunction[String] {
    +  def createIndexRequest(element: String): IndexRequest = {
    +    val json = new java.util.HashMap[String, String]
         json.put("data", element)
    -    println("SENDING: " + element)
    -    Requests.indexRequest.index("my-index").`type`("my-type").source(json)
    +    
    +    return Requests.indexRequest()
    +            .index("my-index")
    +            .type("my-type")
    +            .source(json);
       }
     }))
     {% endhighlight %}
     </div>
     </div>
     
    -The difference is that we now need to provide a list of Elasticsearch Nodes
    -to which the sink should connect using a `TransportClient`.
    +The difference is that now we do not need to provide a list of addresses
    +of Elasticsearch nodes.
     
     More information about Elasticsearch can be found 
[here](https://elastic.co).
    +
    +#### Packaging the Elasticsearch Connector into an Uber-jar
    +
    +For the execution of your Flink program,
    +it is recommended to build a so-called uber-jar (executable jar) 
containing all your dependencies
    +(see [here]({{site.baseurl}}/dev/linking) for further information).
    +
    +However,
    +when an uber-jar containing an Elasticsearch sink is executed,
    +an `IllegalArgumentException` may occur,
    +which is caused by conflicting files of Elasticsearch and it's dependencies
    +in `META-INF/services`:
    +
    +```
    +IllegalArgumentException[An SPI class of type 
org.apache.lucene.codecs.PostingsFormat with name 'Lucene50' does not exist.  
You need to add the corresponding JAR file supporting this SPI to your 
classpath.  The current classpath supports the following names: [es090, 
completion090, XBloomFilter]]
    +```
    +
    +If the uber-jar is build by means of Maven,
    +this issue can be avoided by adding the following bits to the pom file:
    +
    +```
    +<transformer 
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
    +    <resource>META-INF/services/org.apache.lucene.codecs.Codec</resource>
    +</transformer>
    +<transformer 
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
    +    
<resource>META-INF/services/org.apache.lucene.codecs.DocValuesFormat</resource>
    +</transformer>
    +<transformer 
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
    +   
<resource>META-INF/services/org.apache.lucene.codecs.PostingsFormat</resource>
    +</transformer>
    +```
    --- End diff --
    
    +1 for explaining how to resolve this issue!
    It would be good if you could mention in the docs where to add the 
transformers exactly.
    Also, I'm wondering if you could use the ServicesTransformer instead: 
http://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#ServicesResourceTransformer


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to