MartijnVisser commented on a change in pull request #17930: URL: https://github.com/apache/flink/pull/17930#discussion_r759937660
########## File path: docs/content/docs/connectors/datastream/elasticsearch.md ########## @@ -65,240 +61,90 @@ about how to package the program with the libraries for cluster execution. Instructions for setting up an Elasticsearch cluster can be found [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html). -Make sure to set and remember a cluster name. This must be set when -creating an `ElasticsearchSink` for requesting document actions against your cluster. ## Elasticsearch Sink -The `ElasticsearchSink` uses a `TransportClient` (before 6.x) or `RestHighLevelClient` (starting with 6.x) to communicate with an -Elasticsearch cluster. - The example below shows how to configure and create a sink: {{< tabs "51732edd-4218-470e-adad-b1ebb4021ae4" >}} -{{< tab "java, 5.x" >}} -```java -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; -import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; -import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink; - -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.Requests; - -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -DataStream<String> input = ...; - -Map<String, String> config = new HashMap<>(); -config.put("cluster.name", "my-cluster-name"); -// This instructs the sink to emit after every element, otherwise they would be buffered -config.put("bulk.flush.max.actions", "1"); - -List<InetSocketAddress> transportAddresses = new ArrayList<>(); -transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); -transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300)); - -input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() { - public IndexRequest createIndexRequest(String element) { - Map<String, String> json = new HashMap<>(); - json.put("data", element); - - return Requests.indexRequest() - .index("my-index") - .type("my-type") - .source(json); - } - - @Override - public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { - indexer.add(createIndexRequest(element)); - } -}));``` -{{< /tab >}} {{< tab "java, Elasticsearch 6.x and above" >}} Review comment: Yeah I think it's better to mention on one place which versions we support and then in the example list it as ES6 and ES7 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
