[
https://issues.apache.org/jira/browse/FLINK-3115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15201332#comment-15201332
]
ASF GitHub Bot commented on FLINK-3115:
---------------------------------------
Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/flink/pull/1792#discussion_r56641925
--- Diff: docs/apis/streaming/connectors/elasticsearch2.md ---
@@ -0,0 +1,207 @@
+---
+title: "Elasticsearch 2.x Connector"
+
+# Sub-level navigation
+sub-nav-group: streaming
+sub-nav-parent: connectors
+sub-nav-pos: 2
+sub-nav-title: Elasticsearch 2.x
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+This connector provides a Sink that can write to an
+[Elasticsearch 2.x](https://elastic.co/) Index. To use this connector, add
the
+following dependency to your project:
+
+{% highlight xml %}
+<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-elasticsearch2{{ site.scala_version_suffix
}}</artifactId>
+ <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+
+Note that the streaming connectors are currently not part of the binary
+distribution. See
+[here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution)
+for information about how to package the program with the libraries for
+cluster execution.
+
+#### Installing Elasticsearch 2.x
+
+Instructions for setting up an Elasticsearch cluster can be found
+[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
+Make sure to set and remember a cluster name. This must be set when
+creating a Sink for writing to your cluster
+
+#### Elasticsearch 2.x Sink
+The connector provides a Sink that can send data to an Elasticsearch 2.x
Index.
+
+The sink communicates with Elasticsearch in 2 ways:
+
+1. An embedded Node
+2. The TransportClient
+
+See
[here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html)
+for information about the differences between the two modes.
+
+This code shows how to create a sink that uses an embedded Node for
communication:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<String> input = ...;
+
+Map<String, String> config = new HashMap<>;
+// This instructs the sink to emit after every element, otherwise they
would be buffered
+config.put("bulk.flush.max.actions", "1");
+config.put("cluster.name", "my-cluster-name");
+
+// need this with ElasticSearch v2.x
+config.put("path.home", dataDir.getParent());
+
+input.addSink(new ElasticsearchSink<>(config, new
ElasticSearchSinkFunction<String>() {
+ public IndexRequest createIndexRequest(String element, RuntimeContext
ctx) {
+ Map<String, Object> json = new HashMap<>();
+ json.put("data", element);
+
+ return Requests.indexRequest()
+ .index("my-index")
+ .type("my-type")
+ .source(json);
+ }
+
+ @Override
+ public void process(String element, RuntimeContext ctx, RequestIndexer
indexer) {
+ indexer.add(createIndexRequest(element));
+ }
+}));
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataStream[String] = ...
+
+val config = new util.HashMap[String, String]
+config.put("bulk.flush.max.actions", "1")
+config.put("cluster.name", "my-cluster-name")
+
+// need this with ElasticSearch v2.x
+config.put("path.home", dataDir.getParent());
+
+text.addSink(new ElasticsearchSink(config, new
ElasticSearchSinkFunction[String] {
+ def createIndexRequest(element: String, ctx: RuntimeContext):
IndexRequest = {
+ val json = new util.HashMap[String, AnyRef]
+ json.put("data", element)
+ println("SENDING: " + element)
+ Requests.indexRequest.index("my-index").`type`("my-type").source(json)
+ }
+
+ override def process(element: String, ctx: RuntimeContext, indexer:
RequestIndexer) {
+ indexer.add(element))
+ }
+}))
+{% endhighlight %}
+</div>
+</div>
+
+Note how a Map of Strings is used to configure the Sink. The configuration
keys
+are documented in the Elasticsearch documentation
+[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html).
+Especially important is the `cluster.name`. parameter that must correspond
to
+the name of your cluster.
+
+Internally, the sink uses a `BulkProcessor` to send Action requests to the
cluster.
+This will buffer elements and Action Requests before sending to the
cluster. The behaviour of the
+`BulkProcessor` can be configured using these config keys:
+ * **bulk.flush.max.actions**: Maximum amount of elements to buffer
+ * **bulk.flush.max.size.mb**: Maximum amount of data (in megabytes) to
buffer
+ * **bulk.flush.interval.ms**: Interval at which to flush data regardless
of the other two
+ settings in milliseconds
+
+The example code below does the same using a `TransportClient`:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<String> input = ...;
+
+Map<String, String> config = new HashMap<>();
+// This instructs the sink to emit after every element, otherwise they
would be buffered
+config.put("bulk.flush.max.actions", "1");
+config.put("cluster.name", "my-cluster-name");
+
+// need this with ElasticSearch v2.x
+config.put("path.home", dataDir.getParent());
+
+List<TransportAddress> transports = new ArrayList<>();
--- End diff --
This has to be `List<InetSocketAddress>`
> Update Elasticsearch connector to 2.X
> -------------------------------------
>
> Key: FLINK-3115
> URL: https://issues.apache.org/jira/browse/FLINK-3115
> Project: Flink
> Issue Type: Improvement
> Components: Streaming Connectors
> Affects Versions: 0.10.0, 1.0.0, 0.10.1
> Reporter: Maximilian Michels
> Assignee: Suneel Marthi
> Fix For: 1.0.1
>
>
> The Elasticsearch connector is not up to date anymore. In version 2.X the API
> changed. The code needs to be adapted. Probably it makes sense to have a new
> class {{ElasticsearchSink2}}.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)