MartijnVisser commented on a change in pull request #17930:
URL: https://github.com/apache/flink/pull/17930#discussion_r759666055



##########
File path: docs/content/docs/connectors/datastream/elasticsearch.md
##########
@@ -43,15 +43,11 @@ of the Elasticsearch installation:
   </thead>
   <tbody>
     <tr>
-        <td>5.x</td>
-        <td>{{< artifact flink-connector-elasticsearch5 >}}</td>
-    </tr>
-    <tr>
-        <td>6.x</td>
+        <td><= 6.3.1</td>
         <td>{{< artifact flink-connector-elasticsearch6 >}}</td>
     </tr>
     <tr>
-        <td>7 and later versions</td>
+        <td><= 7.5.1</td>

Review comment:
       Don't we support the entire 7.* series? 

##########
File path: docs/content/docs/connectors/datastream/elasticsearch.md
##########
@@ -43,15 +43,11 @@ of the Elasticsearch installation:
   </thead>
   <tbody>
     <tr>
-        <td>5.x</td>
-        <td>{{< artifact flink-connector-elasticsearch5 >}}</td>
-    </tr>
-    <tr>
-        <td>6.x</td>
+        <td><= 6.3.1</td>

Review comment:
       Shouldn't this be 6.8.* if we mirror Elasticsearch policy 
https://www.elastic.co/support/eol ?
   
   `For these users, we maintain the last minor of the prior major release 
series. For example, with Elasticsearch 6.x, we are maintaining the 6.8.x 
series. This allows these users to obtain fixes while making only minor changes 
to their running software. This last minor will be maintained until the release 
of the second subsequent major version. For example, Elasticsearch 6.8.x will 
be maintained until the GA release of Elasticsearch 8.0.0. At the release of 
Elasticsearch 8.0.0, we will continue to maintain the last 7.x series, and 
begin maintaining the 8.0.x minor series, then 8.1.x series, then 8.2.x series 
of minor releases.`

##########
File path: docs/content/docs/connectors/datastream/elasticsearch.md
##########
@@ -65,240 +61,90 @@ about how to package the program with the libraries for 
cluster execution.
 
 Instructions for setting up an Elasticsearch cluster can be found
 
[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
-Make sure to set and remember a cluster name. This must be set when
-creating an `ElasticsearchSink` for requesting document actions against your 
cluster.
 
 ## Elasticsearch Sink
 
-The `ElasticsearchSink` uses a `TransportClient` (before 6.x) or 
`RestHighLevelClient` (starting with 6.x) to communicate with an
-Elasticsearch cluster.
-
 The example below shows how to configure and create a sink:
 
 {{< tabs "51732edd-4218-470e-adad-b1ebb4021ae4" >}}
-{{< tab "java, 5.x" >}}
-```java
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
-import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
-import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;
-
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Requests;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-DataStream<String> input = ...;
-
-Map<String, String> config = new HashMap<>();
-config.put("cluster.name", "my-cluster-name");
-// This instructs the sink to emit after every element, otherwise they would 
be buffered
-config.put("bulk.flush.max.actions", "1");
-
-List<InetSocketAddress> transportAddresses = new ArrayList<>();
-transportAddresses.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-transportAddresses.add(new 
InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));
-
-input.addSink(new ElasticsearchSink<>(config, transportAddresses, new 
ElasticsearchSinkFunction<String>() {
-    public IndexRequest createIndexRequest(String element) {
-        Map<String, String> json = new HashMap<>();
-        json.put("data", element);
-    
-        return Requests.indexRequest()
-                .index("my-index")
-                .type("my-type")
-                .source(json);
-    }
-    
-    @Override
-    public void process(String element, RuntimeContext ctx, RequestIndexer 
indexer) {
-        indexer.add(createIndexRequest(element));
-    }
-}));```
-{{< /tab >}}
 {{< tab "java, Elasticsearch 6.x and above" >}}

Review comment:
       This would then probably be 6.8.* ?

##########
File path: docs/content/docs/connectors/datastream/elasticsearch.md
##########
@@ -65,240 +61,90 @@ about how to package the program with the libraries for 
cluster execution.
 
 Instructions for setting up an Elasticsearch cluster can be found
 
[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
-Make sure to set and remember a cluster name. This must be set when
-creating an `ElasticsearchSink` for requesting document actions against your 
cluster.
 
 ## Elasticsearch Sink
 
-The `ElasticsearchSink` uses a `TransportClient` (before 6.x) or 
`RestHighLevelClient` (starting with 6.x) to communicate with an
-Elasticsearch cluster.
-
 The example below shows how to configure and create a sink:
 
 {{< tabs "51732edd-4218-470e-adad-b1ebb4021ae4" >}}
-{{< tab "java, 5.x" >}}
-```java
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
-import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
-import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;
-
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Requests;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-DataStream<String> input = ...;
-
-Map<String, String> config = new HashMap<>();
-config.put("cluster.name", "my-cluster-name");
-// This instructs the sink to emit after every element, otherwise they would 
be buffered
-config.put("bulk.flush.max.actions", "1");
-
-List<InetSocketAddress> transportAddresses = new ArrayList<>();
-transportAddresses.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-transportAddresses.add(new 
InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));
-
-input.addSink(new ElasticsearchSink<>(config, transportAddresses, new 
ElasticsearchSinkFunction<String>() {
-    public IndexRequest createIndexRequest(String element) {
-        Map<String, String> json = new HashMap<>();
-        json.put("data", element);
-    
-        return Requests.indexRequest()
-                .index("my-index")
-                .type("my-type")
-                .source(json);
-    }
-    
-    @Override
-    public void process(String element, RuntimeContext ctx, RequestIndexer 
indexer) {
-        indexer.add(createIndexRequest(element));
-    }
-}));```
-{{< /tab >}}
 {{< tab "java, Elasticsearch 6.x and above" >}}
 ```java
-import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.connector.elasticsearch.sink.Elasticsearch6SinkBuilder;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
-import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
-import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
 
 import org.apache.http.HttpHost;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.client.Requests;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
-DataStream<String> input = ...;
-
-List<HttpHost> httpHosts = new ArrayList<>();
-httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
-httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));
-
-// use a ElasticsearchSink.Builder to create an ElasticsearchSink
-ElasticsearchSink.Builder<String> esSinkBuilder = new 
ElasticsearchSink.Builder<>(
-    httpHosts,
-    new ElasticsearchSinkFunction<String>() {
-        public IndexRequest createIndexRequest(String element) {
-            Map<String, String> json = new HashMap<>();
-            json.put("data", element);
-        
-            return Requests.indexRequest()
-                    .index("my-index")
-                    .type("my-type")
-                    .source(json);
-        }
-        
-        @Override
-        public void process(String element, RuntimeContext ctx, RequestIndexer 
indexer) {
-            indexer.add(createIndexRequest(element));
-        }
-    }
-);
-
-// configuration for the bulk requests; this instructs the sink to emit after 
every element, otherwise they would be buffered
-esSinkBuilder.setBulkFlushMaxActions(1);
-
-// provide a RestClientFactory for custom configuration on the internally 
created REST client
-esSinkBuilder.setRestClientFactory(
-  restClientBuilder -> {
-    restClientBuilder.setDefaultHeaders(...)
-    restClientBuilder.setMaxRetryTimeoutMillis(...)
-    restClientBuilder.setPathPrefix(...)
-    restClientBuilder.setHttpClientConfigCallback(...)
-  }
-);
-
-// finally, build and add the sink to the job's pipeline
-input.addSink(esSinkBuilder.build());
-```
-{{< /tab >}}
-{{< tab "scala, 5.x" >}}
-```scala
-import org.apache.flink.api.common.functions.RuntimeContext
-import org.apache.flink.streaming.api.datastream.DataStream
-import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
-import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
-import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink
-
-import org.elasticsearch.action.index.IndexRequest
-import org.elasticsearch.client.Requests
+...
 
-import java.net.InetAddress
-import java.net.InetSocketAddress
-import java.util.ArrayList
-import java.util.HashMap
-import java.util.List
-import java.util.Map
+DataStream<String> input = ...;
 
-val input: DataStream[String] = ...
+input.sinkTo(
+    new Elasticsearch6SinkBuilder<String>() // For Elasticsearch 7.x use 
Elasticsearch7SinkBuilder
+        .setBulkFlushMaxActions(1) // Instructs the sink to emit after every 
element, otherwise they would be buffered
+        .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+        .setEmitter(
+        (element, context, indexer) ->
+        indexer.add(createIndexRequest(element)))
+        .build());
 
-val config = new java.util.HashMap[String, String]
-config.put("cluster.name", "my-cluster-name")
-// This instructs the sink to emit after every element, otherwise they would 
be buffered
-config.put("bulk.flush.max.actions", "1")
+...
 
-val transportAddresses = new java.util.ArrayList[InetSocketAddress]
-transportAddresses.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300))
-transportAddresses.add(new 
InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300))
+private static IndexRequest createIndexRequest(String element) {
+    Map<String, Object> json = new HashMap<>();
+    json.put("data", element);
 
-input.addSink(new ElasticsearchSink(config, transportAddresses, new 
ElasticsearchSinkFunction[String] {
-  def createIndexRequest(element: String): IndexRequest = {
-    val json = new java.util.HashMap[String, String]
-    json.put("data", element)
-    
     return Requests.indexRequest()
-            .index("my-index")
-            .type("my-type")
-            .source(json)
-  }
-}))
+        .index("my-index")
+        .type("my-type")
+        .id(element)
+        .source(json);
+}
 ```
 {{< /tab >}}
 {{< tab "scala, Elasticsearch 6.x and above" >}}

Review comment:
       Also 6.8.* ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to