alpreu commented on a change in pull request #17930: URL: https://github.com/apache/flink/pull/17930#discussion_r759917751
########## File path: docs/content/docs/connectors/datastream/elasticsearch.md ########## @@ -65,240 +61,90 @@ about how to package the program with the libraries for cluster execution. Instructions for setting up an Elasticsearch cluster can be found [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html). -Make sure to set and remember a cluster name. This must be set when -creating an `ElasticsearchSink` for requesting document actions against your cluster. ## Elasticsearch Sink -The `ElasticsearchSink` uses a `TransportClient` (before 6.x) or `RestHighLevelClient` (starting with 6.x) to communicate with an -Elasticsearch cluster. - The example below shows how to configure and create a sink: {{< tabs "51732edd-4218-470e-adad-b1ebb4021ae4" >}} -{{< tab "java, 5.x" >}} -```java -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; -import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; -import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink; - -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.Requests; - -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -DataStream<String> input = ...; - -Map<String, String> config = new HashMap<>(); -config.put("cluster.name", "my-cluster-name"); -// This instructs the sink to emit after every element, otherwise they would be buffered -config.put("bulk.flush.max.actions", "1"); - -List<InetSocketAddress> transportAddresses = new ArrayList<>(); -transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); -transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300)); - -input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() { - public IndexRequest createIndexRequest(String element) { - Map<String, String> json = new HashMap<>(); - json.put("data", element); - - return Requests.indexRequest() - .index("my-index") - .type("my-type") - .source(json); - } - - @Override - public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { - indexer.add(createIndexRequest(element)); - } -}));``` -{{< /tab >}} {{< tab "java, Elasticsearch 6.x and above" >}} Review comment: I am thinking about removing the version here completely. At the top of the page we already have it and putting it with the minor version might cause even more confusion because we do support e.g. 6.5. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
