[ 
https://issues.apache.org/jira/browse/FLINK-3115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15201332#comment-15201332
 ] 

ASF GitHub Bot commented on FLINK-3115:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1792#discussion_r56641925
  
    --- Diff: docs/apis/streaming/connectors/elasticsearch2.md ---
    @@ -0,0 +1,207 @@
    +---
    +title: "Elasticsearch 2.x Connector"
    +
    +# Sub-level navigation
    +sub-nav-group: streaming
    +sub-nav-parent: connectors
    +sub-nav-pos: 2
    +sub-nav-title: Elasticsearch 2.x
    +---
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +
    +This connector provides a Sink that can write to an
    +[Elasticsearch 2.x](https://elastic.co/) Index. To use this connector, add 
the
    +following dependency to your project:
    +
    +{% highlight xml %}
    +<dependency>
    +  <groupId>org.apache.flink</groupId>
    +  <artifactId>flink-connector-elasticsearch2{{ site.scala_version_suffix 
}}</artifactId>
    +  <version>{{site.version }}</version>
    +</dependency>
    +{% endhighlight %}
    +
    +Note that the streaming connectors are currently not part of the binary
    +distribution. See
    
+[here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution)
    +for information about how to package the program with the libraries for
    +cluster execution.
    +
    +#### Installing Elasticsearch 2.x
    +
    +Instructions for setting up an Elasticsearch cluster can be found
    
+[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
    +Make sure to set and remember a cluster name. This must be set when
    +creating a Sink for writing to your cluster
    +
    +#### Elasticsearch 2.x Sink
    +The connector provides a Sink that can send data to an Elasticsearch 2.x 
Index.
    +
    +The sink communicates with Elasticsearch in 2 ways:
    +
    +1. An embedded Node
    +2. The TransportClient
    +
    +See 
[here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html)
    +for information about the differences between the two modes.
    +
    +This code shows how to create a sink that uses an embedded Node for 
communication:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +DataStream<String> input = ...;
    +
    +Map<String, String> config = new HashMap<>;
    +// This instructs the sink to emit after every element, otherwise they 
would be buffered
    +config.put("bulk.flush.max.actions", "1");
    +config.put("cluster.name", "my-cluster-name");
    +
    +// need this with ElasticSearch v2.x
    +config.put("path.home", dataDir.getParent());
    +
    +input.addSink(new ElasticsearchSink<>(config, new 
ElasticSearchSinkFunction<String>() {
    +    public IndexRequest createIndexRequest(String element, RuntimeContext 
ctx) {
    +        Map<String, Object> json = new HashMap<>();
    +        json.put("data", element);
    +
    +        return Requests.indexRequest()
    +                .index("my-index")
    +                .type("my-type")
    +                .source(json);
    +    }
    +    
    +    @Override
    +    public void process(String element, RuntimeContext ctx, RequestIndexer 
indexer) {
    +        indexer.add(createIndexRequest(element));
    +    }
    +}));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val input: DataStream[String] = ...
    +
    +val config = new util.HashMap[String, String]
    +config.put("bulk.flush.max.actions", "1")
    +config.put("cluster.name", "my-cluster-name")
    +
    +// need this with ElasticSearch v2.x
    +config.put("path.home", dataDir.getParent());
    +
    +text.addSink(new ElasticsearchSink(config, new 
ElasticSearchSinkFunction[String] {
    +  def createIndexRequest(element: String, ctx: RuntimeContext): 
IndexRequest = {
    +    val json = new util.HashMap[String, AnyRef]
    +    json.put("data", element)
    +    println("SENDING: " + element)
    +    Requests.indexRequest.index("my-index").`type`("my-type").source(json)
    +  }
    +  
    +  override def process(element: String, ctx: RuntimeContext, indexer: 
RequestIndexer) {
    +    indexer.add(element))
    +  }
    +}))
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +Note how a Map of Strings is used to configure the Sink. The configuration 
keys
    +are documented in the Elasticsearch documentation
    
+[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html).
    +Especially important is the `cluster.name`. parameter that must correspond 
to
    +the name of your cluster.
    +
    +Internally, the sink uses a `BulkProcessor` to send Action requests to the 
cluster.
    +This will buffer elements and Action Requests before sending to the 
cluster. The behaviour of the
    +`BulkProcessor` can be configured using these config keys:
    + * **bulk.flush.max.actions**: Maximum amount of elements to buffer
    + * **bulk.flush.max.size.mb**: Maximum amount of data (in megabytes) to 
buffer
    + * **bulk.flush.interval.ms**: Interval at which to flush data regardless 
of the other two
    +  settings in milliseconds
    +
    +The example code below does the same using a `TransportClient`:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +DataStream<String> input = ...;
    +
    +Map<String, String> config = new HashMap<>();
    +// This instructs the sink to emit after every element, otherwise they 
would be buffered
    +config.put("bulk.flush.max.actions", "1");
    +config.put("cluster.name", "my-cluster-name");
    +
    +// need this with ElasticSearch v2.x
    +config.put("path.home", dataDir.getParent());
    +
    +List<TransportAddress> transports = new ArrayList<>();
    --- End diff --
    
    This has to be `List<InetSocketAddress>`


> Update Elasticsearch connector to 2.X
> -------------------------------------
>
>                 Key: FLINK-3115
>                 URL: https://issues.apache.org/jira/browse/FLINK-3115
>             Project: Flink
>          Issue Type: Improvement
>          Components: Streaming Connectors
>    Affects Versions: 0.10.0, 1.0.0, 0.10.1
>            Reporter: Maximilian Michels
>            Assignee: Suneel Marthi
>             Fix For: 1.0.1
>
>
> The Elasticsearch connector is not up to date anymore. In version 2.X the API 
> changed. The code needs to be adapted. Probably it makes sense to have a new 
> class {{ElasticsearchSink2}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to