alpreu commented on a change in pull request #17930: URL: https://github.com/apache/flink/pull/17930#discussion_r762905457
########## File path: docs/content/docs/connectors/datastream/elasticsearch.md ########## @@ -65,240 +61,90 @@ about how to package the program with the libraries for cluster execution. Instructions for setting up an Elasticsearch cluster can be found [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html). -Make sure to set and remember a cluster name. This must be set when -creating an `ElasticsearchSink` for requesting document actions against your cluster. ## Elasticsearch Sink -The `ElasticsearchSink` uses a `TransportClient` (before 6.x) or `RestHighLevelClient` (starting with 6.x) to communicate with an -Elasticsearch cluster. - The example below shows how to configure and create a sink: {{< tabs "51732edd-4218-470e-adad-b1ebb4021ae4" >}} -{{< tab "java, 5.x" >}} -```java -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; -import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; -import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink; - -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.Requests; - -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -DataStream<String> input = ...; - -Map<String, String> config = new HashMap<>(); -config.put("cluster.name", "my-cluster-name"); -// This instructs the sink to emit after every element, otherwise they would be buffered -config.put("bulk.flush.max.actions", "1"); - -List<InetSocketAddress> transportAddresses = new ArrayList<>(); -transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); -transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300)); - -input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() { - public IndexRequest createIndexRequest(String element) { - Map<String, String> json = new HashMap<>(); - json.put("data", element); - - return Requests.indexRequest() - .index("my-index") - .type("my-type") - .source(json); - } - - @Override - public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { - indexer.add(createIndexRequest(element)); - } -}));``` -{{< /tab >}} {{< tab "java, Elasticsearch 6.x and above" >}} ```java -import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.connector.elasticsearch.sink.Elasticsearch6SinkBuilder; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; -import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; -import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink; import org.apache.http.HttpHost; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Requests; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; -DataStream<String> input = ...; - -List<HttpHost> httpHosts = new ArrayList<>(); -httpHosts.add(new HttpHost("127.0.0.1", 9200, "http")); -httpHosts.add(new HttpHost("10.2.3.1", 9200, "http")); - -// use a ElasticsearchSink.Builder to create an ElasticsearchSink -ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>( - httpHosts, - new ElasticsearchSinkFunction<String>() { - public IndexRequest createIndexRequest(String element) { - Map<String, String> json = new HashMap<>(); - json.put("data", element); - - return Requests.indexRequest() - .index("my-index") - .type("my-type") - .source(json); - } - - @Override - public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { - indexer.add(createIndexRequest(element)); - } - } -); - -// configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered -esSinkBuilder.setBulkFlushMaxActions(1); - -// provide a RestClientFactory for custom configuration on the internally created REST client -esSinkBuilder.setRestClientFactory( - restClientBuilder -> { - restClientBuilder.setDefaultHeaders(...) - restClientBuilder.setMaxRetryTimeoutMillis(...) - restClientBuilder.setPathPrefix(...) - restClientBuilder.setHttpClientConfigCallback(...) - } -); - -// finally, build and add the sink to the job's pipeline -input.addSink(esSinkBuilder.build()); -``` -{{< /tab >}} -{{< tab "scala, 5.x" >}} -```scala -import org.apache.flink.api.common.functions.RuntimeContext -import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction -import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer -import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink - -import org.elasticsearch.action.index.IndexRequest -import org.elasticsearch.client.Requests +... -import java.net.InetAddress -import java.net.InetSocketAddress -import java.util.ArrayList -import java.util.HashMap -import java.util.List -import java.util.Map +DataStream<String> input = ...; -val input: DataStream[String] = ... +input.sinkTo( + new Elasticsearch6SinkBuilder<String>() // For Elasticsearch 7.x use Elasticsearch7SinkBuilder + .setBulkFlushMaxActions(1) // Instructs the sink to emit after every element, otherwise they would be buffered + .setHosts(new HttpHost("127.0.0.1", 9200, "http")) + .setEmitter( + (element, context, indexer) -> + indexer.add(createIndexRequest(element))) + .build()); -val config = new java.util.HashMap[String, String] -config.put("cluster.name", "my-cluster-name") -// This instructs the sink to emit after every element, otherwise they would be buffered -config.put("bulk.flush.max.actions", "1") +... -val transportAddresses = new java.util.ArrayList[InetSocketAddress] -transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)) -transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300)) +private static IndexRequest createIndexRequest(String element) { + Map<String, Object> json = new HashMap<>(); + json.put("data", element); -input.addSink(new ElasticsearchSink(config, transportAddresses, new ElasticsearchSinkFunction[String] { - def createIndexRequest(element: String): IndexRequest = { - val json = new java.util.HashMap[String, String] - json.put("data", element) - return Requests.indexRequest() - .index("my-index") - .type("my-type") - .source(json) - } -})) + .index("my-index") + .type("my-type") + .id(element) + .source(json); +} ``` {{< /tab >}} {{< tab "scala, Elasticsearch 6.x and above" >}} ```scala -import org.apache.flink.api.common.functions.RuntimeContext +import org.apache.flink.api.connector.sink.SinkWriter +import org.apache.flink.connector.elasticsearch.sink.{Elasticsearch6SinkBuilder, RequestIndexer} import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction -import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer -import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink - import org.apache.http.HttpHost import org.elasticsearch.action.index.IndexRequest import org.elasticsearch.client.Requests -import java.util.ArrayList -import java.util.List +... val input: DataStream[String] = ... -val httpHosts = new java.util.ArrayList[HttpHost] -httpHosts.add(new HttpHost("127.0.0.1", 9200, "http")) -httpHosts.add(new HttpHost("10.2.3.1", 9200, "http")) - -val esSinkBuilder = new ElasticsearchSink.Builder[String]( - httpHosts, - new ElasticsearchSinkFunction[String] { - def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer) { - val json = new java.util.HashMap[String, String] - json.put("data", element) - - val rqst: IndexRequest = Requests.indexRequest - .index("my-index") - .`type`("my-type") - .source(json) - - indexer.add(rqst) - } - } -) - -// configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered -esSinkBuilder.setBulkFlushMaxActions(1) - -// provide a RestClientFactory for custom configuration on the internally created REST client -esSinkBuilder.setRestClientFactory(new RestClientFactory { - override def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = { - restClientBuilder.setDefaultHeaders(...) - restClientBuilder.setMaxRetryTimeoutMillis(...) - restClientBuilder.setPathPrefix(...) - restClientBuilder.setHttpClientConfigCallback(...) - } -}) - -// finally, build and add the sink to the job's pipeline -input.addSink(esSinkBuilder.build) +input.sinkTo( + new Elasticsearch6SinkBuilder[String] // For Elasticsearch 7.x use Elasticsearch7SinkBuilder + .setBulkFlushMaxActions(1) // Instructs the sink to emit after every element, otherwise they would be buffered + .setHosts(new HttpHost("127.0.0.1", 9200, "http")) + .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) => + indexer.add(createIndexRequest(element))) + .build()) + +... + +def createIndexRequest(element: (String)): IndexRequest = { + + val json = Map( + "data" -> element.asInstanceOf[AnyRef] + ) + + Requests.indexRequest.index("my-index").`type`("my-type").source(mapAsJavaMap(json)) +} ``` {{< /tab >}} Review comment: I had a look at the other connectors and kept the tabs as java and scala and added an example for each version in there -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
