[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15821605#comment-15821605
 ] 

ASF GitHub Bot commented on FLINK-4988:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3112#discussion_r95969123
  
    --- Diff: docs/dev/connectors/elasticsearch2.md ---
    @@ -1,173 +0,0 @@
    ----
    -title: "Elasticsearch 2.x Connector"
    -nav-title: Elasticsearch 2.x
    -nav-parent_id: connectors
    -nav-pos: 5
    ----
    -<!--
    -Licensed to the Apache Software Foundation (ASF) under one
    -or more contributor license agreements.  See the NOTICE file
    -distributed with this work for additional information
    -regarding copyright ownership.  The ASF licenses this file
    -to you under the Apache License, Version 2.0 (the
    -"License"); you may not use this file except in compliance
    -with the License.  You may obtain a copy of the License at
    -
    -  http://www.apache.org/licenses/LICENSE-2.0
    -
    -Unless required by applicable law or agreed to in writing,
    -software distributed under the License is distributed on an
    -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    -KIND, either express or implied.  See the License for the
    -specific language governing permissions and limitations
    -under the License.
    --->
    -
    -This connector provides a Sink that can write to an
    -[Elasticsearch 2.x](https://elastic.co/) Index. To use this connector, add 
the
    -following dependency to your project:
    -
    -{% highlight xml %}
    -<dependency>
    -  <groupId>org.apache.flink</groupId>
    -  <artifactId>flink-connector-elasticsearch2{{ site.scala_version_suffix 
}}</artifactId>
    -  <version>{{site.version }}</version>
    -</dependency>
    -{% endhighlight %}
    -
    -Note that the streaming connectors are currently not part of the binary
    -distribution. See
    -[here]({{site.baseurl}}/dev/linking)
    -for information about how to package the program with the libraries for
    -cluster execution.
    -
    -#### Installing Elasticsearch 2.x
    -
    -Instructions for setting up an Elasticsearch cluster can be found
    
-[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
    -Make sure to set and remember a cluster name. This must be set when
    -creating a Sink for writing to your cluster
    -
    -#### Elasticsearch 2.x Sink
    -The connector provides a Sink that can send data to an Elasticsearch 2.x 
Index.
    -
    -The sink communicates with Elasticsearch via Transport Client
    -
    -See 
[here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/transport-client.html)
    -for information about the Transport Client.
    -
    -The code below shows how to create a sink that uses a `TransportClient` 
for communication:
    -
    -<div class="codetabs" markdown="1">
    -<div data-lang="java" markdown="1">
    -{% highlight java %}
    -File dataDir = ....;
    -
    -DataStream<String> input = ...;
    -
    -Map<String, String> config = new HashMap<>();
    -// This instructs the sink to emit after every element, otherwise they 
would be buffered
    -config.put("bulk.flush.max.actions", "1");
    -config.put("cluster.name", "my-cluster-name");
    -
    -List<InetSocketAddress> transports = new ArrayList<>();
    -transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 
9300));
    -transports.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 
9300));
    -
    -input.addSink(new ElasticsearchSink(config, transports, new 
ElasticsearchSinkFunction<String>() {
    -  public IndexRequest createIndexRequest(String element) {
    -    Map<String, String> json = new HashMap<>();
    -    json.put("data", element);
    -
    -    return Requests.indexRequest()
    -            .index("my-index")
    -            .type("my-type")
    -            .source(json);
    -  }
    -
    -  @Override
    -  public void process(String element, RuntimeContext ctx, RequestIndexer 
indexer) {
    -    indexer.add(createIndexRequest(element));
    -  }
    -}));
    -{% endhighlight %}
    -</div>
    -<div data-lang="scala" markdown="1">
    -{% highlight scala %}
    -val dataDir = ....;
    -
    -val input: DataStream[String] = ...
    -
    -val config = new util.HashMap[String, String]
    -config.put("bulk.flush.max.actions", "1")
    -config.put("cluster.name", "my-cluster-name")
    -
    -val transports = new ArrayList[String]
    -transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 
9300))
    -transports.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 
9300));
    -
    -input.addSink(new ElasticsearchSink(config, transports, new 
ElasticsearchSinkFunction[String] {
    -  def createIndexRequest(element: String): IndexRequest = {
    -    val json = new util.HashMap[String, AnyRef]
    -    json.put("data", element)
    -    Requests.indexRequest.index("my-index").`type`("my-type").source(json)
    -  }
    -
    -  override def process(element: String, ctx: RuntimeContext, indexer: 
RequestIndexer) {
    -    indexer.add(createIndexRequest(element))
    -  }
    -}))
    -{% endhighlight %}
    -</div>
    -</div>
    -
    -A Map of Strings is used to configure the Sink. The configuration keys
    -are documented in the Elasticsearch documentation
    
-[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html).
    -Especially important is the `cluster.name`. parameter that must correspond 
to
    -the name of your cluster and with ElasticSearch 2x you also need to 
specify `path.home`.
    -
    -Internally, the sink uses a `BulkProcessor` to send Action requests to the 
cluster.
    -This will buffer elements and Action Requests before sending to the 
cluster. The behaviour of the
    -`BulkProcessor` can be configured using these config keys:
    - * **bulk.flush.max.actions**: Maximum amount of elements to buffer
    - * **bulk.flush.max.size.mb**: Maximum amount of data (in megabytes) to 
buffer
    - * **bulk.flush.interval.ms**: Interval at which to flush data regardless 
of the other two
    -  settings in milliseconds
    -
    -This now provides a list of Elasticsearch Nodes
    -to which the sink should connect via a `TransportClient`.
    -
    -More information about Elasticsearch can be found 
[here](https://elastic.co).
    -
    -
    -#### Packaging the Elasticsearch Connector into an Uber-jar
    -
    -For the execution of your Flink program,
    -it is recommended to build a so-called uber-jar (executable jar) 
containing all your dependencies
    -(see [here]({{site.baseurl}}/dev/linking) for further information).
    -
    -However,
    -when an uber-jar containing an Elasticsearch sink is executed,
    -an `IllegalArgumentException` may occur,
    -which is caused by conflicting files of Elasticsearch and it's dependencies
    -in `META-INF/services`:
    -
    -```
    -IllegalArgumentException[An SPI class of type 
org.apache.lucene.codecs.PostingsFormat with name 'Lucene50' does not exist.  
You need to add the corresponding JAR file supporting this SPI to your 
classpath.  The current classpath supports the following names: [es090, 
completion090, XBloomFilter]]
    -```
    -
    -If the uber-jar is build by means of maven,
    -this issue can be avoided by adding the following bits to the pom file:
    -
    -```
    -<transformer 
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
    -    <resource>META-INF/services/org.apache.lucene.codecs.Codec</resource>
    -</transformer>
    -<transformer 
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
    -    
<resource>META-INF/services/org.apache.lucene.codecs.DocValuesFormat</resource>
    -</transformer>
    -<transformer 
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
    -   
<resource>META-INF/services/org.apache.lucene.codecs.PostingsFormat</resource>
    -</transformer>
    --- End diff --
    
    Ah, I see. This has been there before :) Still, would be great if you could 
fix it :)


> Elasticsearch 5.x support
> -------------------------
>
>                 Key: FLINK-4988
>                 URL: https://issues.apache.org/jira/browse/FLINK-4988
>             Project: Flink
>          Issue Type: New Feature
>            Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to