[
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15856129#comment-15856129
]
ASF GitHub Bot commented on FLINK-4988:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/3112#discussion_r99837460
--- Diff: docs/dev/connectors/elasticsearch.md ---
@@ -23,158 +23,291 @@ specific language governing permissions and
limitations
under the License.
-->
-This connector provides a Sink that can write to an
-[Elasticsearch](https://elastic.co/) Index. To use this connector, add the
-following dependency to your project:
-
-{% highlight xml %}
-<dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-elasticsearch{{ site.scala_version_suffix
}}</artifactId>
- <version>{{site.version }}</version>
-</dependency>
-{% endhighlight %}
+This connector provides sinks that can request document actions to an
+[Elasticsearch](https://elastic.co/) Index. To use this connector, add one
+of the following dependencies to your project, depending on the version
+of the Elasticsearch installation:
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left">Maven Dependency</th>
+ <th class="text-left">Supported since</th>
+ <th class="text-left">Elasticsearch version</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td>flink-connector-elasticsearch{{ site.scala_version_suffix
}}</td>
+ <td>1.0.0</td>
+ <td>1.x</td>
+ </tr>
+ <tr>
+ <td>flink-connector-elasticsearch2{{ site.scala_version_suffix
}}</td>
+ <td>1.0.0</td>
+ <td>2.x</td>
+ </tr>
+ <tr>
+ <td>flink-connector-elasticsearch5{{ site.scala_version_suffix
}}</td>
+ <td>1.2.0</td>
+ <td>5.x</td>
+ </tr>
+ </tbody>
+</table>
Note that the streaming connectors are currently not part of the binary
-distribution. See
-[here]({{site.baseurl}}/dev/linking.html)
-for information about how to package the program with the libraries for
-cluster execution.
+distribution. See [here]({{site.baseurl}}/dev/linking.html) for information
+about how to package the program with the libraries for cluster execution.
#### Installing Elasticsearch
Instructions for setting up an Elasticsearch cluster can be found
[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
Make sure to set and remember a cluster name. This must be set when
-creating a Sink for writing to your cluster
+creating an `ElasticsearchSink` for requesting document actions against
your cluster.
#### Elasticsearch Sink
-The connector provides a Sink that can send data to an Elasticsearch Index.
-
-The sink can use two different methods for communicating with
Elasticsearch:
-
-1. An embedded Node
-2. The TransportClient
-See
[here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html)
-for information about the differences between the two modes.
+The `ElasticsearchSink` uses a `TransportClient` to communicate with an
+Elasticsearch cluster.
-This code shows how to create a sink that uses an embedded Node for
-communication:
+The example below shows how to configure and create a sink:
<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
+<div data-lang="java, Elasticsearch 1.x" markdown="1">
{% highlight java %}
DataStream<String> input = ...;
-Map<String, String> config = Maps.newHashMap();
+Map<String, String> config = new HashMap<>();
+config.put("cluster.name", "my-cluster-name")
// This instructs the sink to emit after every element, otherwise they
would be buffered
config.put("bulk.flush.max.actions", "1");
-config.put("cluster.name", "my-cluster-name");
-input.addSink(new ElasticsearchSink<>(config, new
IndexRequestBuilder<String>() {
- @Override
- public IndexRequest createIndexRequest(String element, RuntimeContext
ctx) {
- Map<String, Object> json = new HashMap<>();
- json.put("data", element);
+List<TransportAddress> transportAddresses = new ArrayList<String>();
+transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300));
+transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300));
+input.addSink(new ElasticsearchSink<>(config, transportAddresses, new
ElasticsearchSinkFunction<String>() {
+ public IndexRequest createIndexRequest(String element) {
+ Map<String, String> json = new HashMap<>();
+ json.put("data", element);
+
return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}
+
+ @Override
+ public void process(String element, RuntimeContext ctx, RequestIndexer
indexer) {
+ indexer.add(createIndexRequest(element));
+ }
}));
{% endhighlight %}
</div>
-<div data-lang="scala" markdown="1">
+<div data-lang="java, Elasticsearch 2.x / 5.x" markdown="1">
+{% highlight java %}
+DataStream<String> input = ...;
+
+Map<String, String> config = new HashMap<>();
+config.put("cluster.name", "my-cluster-name")
+// This instructs the sink to emit after every element, otherwise they
would be buffered
+config.put("bulk.flush.max.actions", "1");
+
+List<InetSocketAddress> transportAddresses = new ArrayList<>();
+transportAddresses.add(new
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+transportAddresses.add(new
InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));
+
+input.addSink(new ElasticsearchSink<>(config, transportAddresses, new
ElasticsearchSinkFunction<String>() {
+ public IndexRequest createIndexRequest(String element) {
+ Map<String, String> json = new HashMap<>();
+ json.put("data", element);
+
+ return Requests.indexRequest()
+ .index("my-index")
+ .type("my-type")
+ .source(json);
+ }
+
+ @Override
+ public void process(String element, RuntimeContext ctx, RequestIndexer
indexer) {
+ indexer.add(createIndexRequest(element));
+ }
+}));{% endhighlight %}
+</div>
+<div data-lang="scala, Elasticsearch 1.x" markdown="1">
{% highlight scala %}
val input: DataStream[String] = ...
-val config = new util.HashMap[String, String]
+val config = new java.util.HashMap[String, String]
+config.put("cluster.name", "my-cluster-name")
--- End diff --
This is scala code, so the semicolon is ignored ;-) .
> Elasticsearch 5.x support
> -------------------------
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
> Issue Type: New Feature
> Components: Streaming Connectors
> Reporter: Mike Dias
>
> Elasticsearch 5.x was released:
> https://www.elastic.co/blog/elasticsearch-5-0-0-released
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)