MartijnVisser commented on a change in pull request #17930:
URL: https://github.com/apache/flink/pull/17930#discussion_r759666055
##########
File path: docs/content/docs/connectors/datastream/elasticsearch.md
##########
@@ -43,15 +43,11 @@ of the Elasticsearch installation:
</thead>
<tbody>
<tr>
- <td>5.x</td>
- <td>{{< artifact flink-connector-elasticsearch5 >}}</td>
- </tr>
- <tr>
- <td>6.x</td>
+ <td><= 6.3.1</td>
<td>{{< artifact flink-connector-elasticsearch6 >}}</td>
</tr>
<tr>
- <td>7 and later versions</td>
+ <td><= 7.5.1</td>
Review comment:
Don't we support the entire 7.* series?
##########
File path: docs/content/docs/connectors/datastream/elasticsearch.md
##########
@@ -43,15 +43,11 @@ of the Elasticsearch installation:
</thead>
<tbody>
<tr>
- <td>5.x</td>
- <td>{{< artifact flink-connector-elasticsearch5 >}}</td>
- </tr>
- <tr>
- <td>6.x</td>
+ <td><= 6.3.1</td>
Review comment:
Shouldn't this be 6.8.* if we mirror Elasticsearch policy
https://www.elastic.co/support/eol ?
`For these users, we maintain the last minor of the prior major release
series. For example, with Elasticsearch 6.x, we are maintaining the 6.8.x
series. This allows these users to obtain fixes while making only minor changes
to their running software. This last minor will be maintained until the release
of the second subsequent major version. For example, Elasticsearch 6.8.x will
be maintained until the GA release of Elasticsearch 8.0.0. At the release of
Elasticsearch 8.0.0, we will continue to maintain the last 7.x series, and
begin maintaining the 8.0.x minor series, then 8.1.x series, then 8.2.x series
of minor releases.`
##########
File path: docs/content/docs/connectors/datastream/elasticsearch.md
##########
@@ -65,240 +61,90 @@ about how to package the program with the libraries for
cluster execution.
Instructions for setting up an Elasticsearch cluster can be found
[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
-Make sure to set and remember a cluster name. This must be set when
-creating an `ElasticsearchSink` for requesting document actions against your
cluster.
## Elasticsearch Sink
-The `ElasticsearchSink` uses a `TransportClient` (before 6.x) or
`RestHighLevelClient` (starting with 6.x) to communicate with an
-Elasticsearch cluster.
-
The example below shows how to configure and create a sink:
{{< tabs "51732edd-4218-470e-adad-b1ebb4021ae4" >}}
-{{< tab "java, 5.x" >}}
-```java
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
-import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
-import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;
-
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Requests;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-DataStream<String> input = ...;
-
-Map<String, String> config = new HashMap<>();
-config.put("cluster.name", "my-cluster-name");
-// This instructs the sink to emit after every element, otherwise they would
be buffered
-config.put("bulk.flush.max.actions", "1");
-
-List<InetSocketAddress> transportAddresses = new ArrayList<>();
-transportAddresses.add(new
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-transportAddresses.add(new
InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));
-
-input.addSink(new ElasticsearchSink<>(config, transportAddresses, new
ElasticsearchSinkFunction<String>() {
- public IndexRequest createIndexRequest(String element) {
- Map<String, String> json = new HashMap<>();
- json.put("data", element);
-
- return Requests.indexRequest()
- .index("my-index")
- .type("my-type")
- .source(json);
- }
-
- @Override
- public void process(String element, RuntimeContext ctx, RequestIndexer
indexer) {
- indexer.add(createIndexRequest(element));
- }
-}));```
-{{< /tab >}}
{{< tab "java, Elasticsearch 6.x and above" >}}
Review comment:
This would then probably be 6.8.* ?
##########
File path: docs/content/docs/connectors/datastream/elasticsearch.md
##########
@@ -65,240 +61,90 @@ about how to package the program with the libraries for
cluster execution.
Instructions for setting up an Elasticsearch cluster can be found
[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
-Make sure to set and remember a cluster name. This must be set when
-creating an `ElasticsearchSink` for requesting document actions against your
cluster.
## Elasticsearch Sink
-The `ElasticsearchSink` uses a `TransportClient` (before 6.x) or
`RestHighLevelClient` (starting with 6.x) to communicate with an
-Elasticsearch cluster.
-
The example below shows how to configure and create a sink:
{{< tabs "51732edd-4218-470e-adad-b1ebb4021ae4" >}}
-{{< tab "java, 5.x" >}}
-```java
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
-import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
-import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;
-
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Requests;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-DataStream<String> input = ...;
-
-Map<String, String> config = new HashMap<>();
-config.put("cluster.name", "my-cluster-name");
-// This instructs the sink to emit after every element, otherwise they would
be buffered
-config.put("bulk.flush.max.actions", "1");
-
-List<InetSocketAddress> transportAddresses = new ArrayList<>();
-transportAddresses.add(new
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-transportAddresses.add(new
InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));
-
-input.addSink(new ElasticsearchSink<>(config, transportAddresses, new
ElasticsearchSinkFunction<String>() {
- public IndexRequest createIndexRequest(String element) {
- Map<String, String> json = new HashMap<>();
- json.put("data", element);
-
- return Requests.indexRequest()
- .index("my-index")
- .type("my-type")
- .source(json);
- }
-
- @Override
- public void process(String element, RuntimeContext ctx, RequestIndexer
indexer) {
- indexer.add(createIndexRequest(element));
- }
-}));```
-{{< /tab >}}
{{< tab "java, Elasticsearch 6.x and above" >}}
```java
-import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.connector.elasticsearch.sink.Elasticsearch6SinkBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;
-import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
-import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
-import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-DataStream<String> input = ...;
-
-List<HttpHost> httpHosts = new ArrayList<>();
-httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
-httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));
-
-// use a ElasticsearchSink.Builder to create an ElasticsearchSink
-ElasticsearchSink.Builder<String> esSinkBuilder = new
ElasticsearchSink.Builder<>(
- httpHosts,
- new ElasticsearchSinkFunction<String>() {
- public IndexRequest createIndexRequest(String element) {
- Map<String, String> json = new HashMap<>();
- json.put("data", element);
-
- return Requests.indexRequest()
- .index("my-index")
- .type("my-type")
- .source(json);
- }
-
- @Override
- public void process(String element, RuntimeContext ctx, RequestIndexer
indexer) {
- indexer.add(createIndexRequest(element));
- }
- }
-);
-
-// configuration for the bulk requests; this instructs the sink to emit after
every element, otherwise they would be buffered
-esSinkBuilder.setBulkFlushMaxActions(1);
-
-// provide a RestClientFactory for custom configuration on the internally
created REST client
-esSinkBuilder.setRestClientFactory(
- restClientBuilder -> {
- restClientBuilder.setDefaultHeaders(...)
- restClientBuilder.setMaxRetryTimeoutMillis(...)
- restClientBuilder.setPathPrefix(...)
- restClientBuilder.setHttpClientConfigCallback(...)
- }
-);
-
-// finally, build and add the sink to the job's pipeline
-input.addSink(esSinkBuilder.build());
-```
-{{< /tab >}}
-{{< tab "scala, 5.x" >}}
-```scala
-import org.apache.flink.api.common.functions.RuntimeContext
-import org.apache.flink.streaming.api.datastream.DataStream
-import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
-import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
-import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink
-
-import org.elasticsearch.action.index.IndexRequest
-import org.elasticsearch.client.Requests
+...
-import java.net.InetAddress
-import java.net.InetSocketAddress
-import java.util.ArrayList
-import java.util.HashMap
-import java.util.List
-import java.util.Map
+DataStream<String> input = ...;
-val input: DataStream[String] = ...
+input.sinkTo(
+ new Elasticsearch6SinkBuilder<String>() // For Elasticsearch 7.x use
Elasticsearch7SinkBuilder
+ .setBulkFlushMaxActions(1) // Instructs the sink to emit after every
element, otherwise they would be buffered
+ .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+ .setEmitter(
+ (element, context, indexer) ->
+ indexer.add(createIndexRequest(element)))
+ .build());
-val config = new java.util.HashMap[String, String]
-config.put("cluster.name", "my-cluster-name")
-// This instructs the sink to emit after every element, otherwise they would
be buffered
-config.put("bulk.flush.max.actions", "1")
+...
-val transportAddresses = new java.util.ArrayList[InetSocketAddress]
-transportAddresses.add(new
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300))
-transportAddresses.add(new
InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300))
+private static IndexRequest createIndexRequest(String element) {
+ Map<String, Object> json = new HashMap<>();
+ json.put("data", element);
-input.addSink(new ElasticsearchSink(config, transportAddresses, new
ElasticsearchSinkFunction[String] {
- def createIndexRequest(element: String): IndexRequest = {
- val json = new java.util.HashMap[String, String]
- json.put("data", element)
-
return Requests.indexRequest()
- .index("my-index")
- .type("my-type")
- .source(json)
- }
-}))
+ .index("my-index")
+ .type("my-type")
+ .id(element)
+ .source(json);
+}
```
{{< /tab >}}
{{< tab "scala, Elasticsearch 6.x and above" >}}
Review comment:
Also 6.8.* ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]