This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 432244c  [FLINK-24326][docs][connectors/elasticsearch] Update 
elasticsearch sink pages with new unified sink API (FLIP-143) implementation
432244c is described below

commit 432244c0c6aabff7d19cb6f694bb5b9040a5c26a
Author: Alexander Preuß <[email protected]>
AuthorDate: Fri Nov 26 14:30:55 2021 +0100

    [FLINK-24326][docs][connectors/elasticsearch] Update elasticsearch sink 
pages with new unified sink API (FLIP-143) implementation
---
 .../docs/connectors/datastream/elasticsearch.md    | 473 +++++++++------------
 .../content/docs/connectors/table/elasticsearch.md |  52 +--
 2 files changed, 212 insertions(+), 313 deletions(-)

diff --git a/docs/content/docs/connectors/datastream/elasticsearch.md 
b/docs/content/docs/connectors/datastream/elasticsearch.md
index d066fd1..8fd6ab6 100644
--- a/docs/content/docs/connectors/datastream/elasticsearch.md
+++ b/docs/content/docs/connectors/datastream/elasticsearch.md
@@ -43,15 +43,11 @@ of the Elasticsearch installation:
   </thead>
   <tbody>
     <tr>
-        <td>5.x</td>
-        <td>{{< artifact flink-connector-elasticsearch5 >}}</td>
-    </tr>
-    <tr>
-        <td>6.x</td>
+        <td><= 6.3.1</td>
         <td>{{< artifact flink-connector-elasticsearch6 >}}</td>
     </tr>
     <tr>
-        <td>7 and later versions</td>
+        <td><= 7.5.1</td>
         <td>{{< artifact flink-connector-elasticsearch7 >}}</td>
     </tr>
   </tbody>
@@ -65,240 +61,149 @@ about how to package the program with the libraries for 
cluster execution.
 
 Instructions for setting up an Elasticsearch cluster can be found
 
[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
-Make sure to set and remember a cluster name. This must be set when
-creating an `ElasticsearchSink` for requesting document actions against your 
cluster.
 
 ## Elasticsearch Sink
 
-The `ElasticsearchSink` uses a `TransportClient` (before 6.x) or 
`RestHighLevelClient` (starting with 6.x) to communicate with an
-Elasticsearch cluster.
-
 The example below shows how to configure and create a sink:
 
 {{< tabs "51732edd-4218-470e-adad-b1ebb4021ae4" >}}
-{{< tab "java, 5.x" >}}
+{{< tab "Java" >}}
+Elasticsearch 6: 
 ```java
-import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.connector.elasticsearch.sink.Elasticsearch6SinkBuilder;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
-import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
-import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;
 
+import org.apache.http.HttpHost;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.client.Requests;
 
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 DataStream<String> input = ...;
 
-Map<String, String> config = new HashMap<>();
-config.put("cluster.name", "my-cluster-name");
-// This instructs the sink to emit after every element, otherwise they would 
be buffered
-config.put("bulk.flush.max.actions", "1");
-
-List<InetSocketAddress> transportAddresses = new ArrayList<>();
-transportAddresses.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-transportAddresses.add(new 
InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));
-
-input.addSink(new ElasticsearchSink<>(config, transportAddresses, new 
ElasticsearchSinkFunction<String>() {
-    public IndexRequest createIndexRequest(String element) {
-        Map<String, String> json = new HashMap<>();
-        json.put("data", element);
-    
-        return Requests.indexRequest()
-                .index("my-index")
-                .type("my-type")
-                .source(json);
-    }
-    
-    @Override
-    public void process(String element, RuntimeContext ctx, RequestIndexer 
indexer) {
-        indexer.add(createIndexRequest(element));
-    }
-}));```
-{{< /tab >}}
-{{< tab "java, Elasticsearch 6.x and above" >}}
+input.sinkTo(
+    new Elasticsearch6SinkBuilder<String>()
+        .setBulkFlushMaxActions(1) // Instructs the sink to emit after every 
element, otherwise they would be buffered
+        .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+        .setEmitter(
+        (element, context, indexer) ->
+        indexer.add(createIndexRequest(element)))
+        .build());
+
+private static IndexRequest createIndexRequest(String element) {
+    Map<String, Object> json = new HashMap<>();
+    json.put("data", element);
+
+    return Requests.indexRequest()
+        .index("my-index")
+        .type("my-type")
+        .id(element)
+        .source(json);
+}
+```
+
+Elasticsearch 7:
 ```java
-import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
-import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
-import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
 
 import org.apache.http.HttpHost;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.client.Requests;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 DataStream<String> input = ...;
 
-List<HttpHost> httpHosts = new ArrayList<>();
-httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
-httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));
-
-// use a ElasticsearchSink.Builder to create an ElasticsearchSink
-ElasticsearchSink.Builder<String> esSinkBuilder = new 
ElasticsearchSink.Builder<>(
-    httpHosts,
-    new ElasticsearchSinkFunction<String>() {
-        public IndexRequest createIndexRequest(String element) {
-            Map<String, String> json = new HashMap<>();
-            json.put("data", element);
-        
-            return Requests.indexRequest()
-                    .index("my-index")
-                    .type("my-type")
-                    .source(json);
-        }
-        
-        @Override
-        public void process(String element, RuntimeContext ctx, RequestIndexer 
indexer) {
-            indexer.add(createIndexRequest(element));
-        }
-    }
-);
-
-// configuration for the bulk requests; this instructs the sink to emit after 
every element, otherwise they would be buffered
-esSinkBuilder.setBulkFlushMaxActions(1);
-
-// provide a RestClientFactory for custom configuration on the internally 
created REST client
-esSinkBuilder.setRestClientFactory(
-  restClientBuilder -> {
-    restClientBuilder.setDefaultHeaders(...)
-    restClientBuilder.setMaxRetryTimeoutMillis(...)
-    restClientBuilder.setPathPrefix(...)
-    restClientBuilder.setHttpClientConfigCallback(...)
-  }
-);
-
-// finally, build and add the sink to the job's pipeline
-input.addSink(esSinkBuilder.build());
+input.sinkTo(
+    new Elasticsearch7SinkBuilder<String>()
+        .setBulkFlushMaxActions(1) // Instructs the sink to emit after every 
element, otherwise they would be buffered
+        .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+        .setEmitter(
+        (element, context, indexer) ->
+        indexer.add(createIndexRequest(element)))
+        .build());
+
+private static IndexRequest createIndexRequest(String element) {
+    Map<String, Object> json = new HashMap<>();
+    json.put("data", element);
+
+    return Requests.indexRequest()
+        .index("my-index")
+        .type("my-type")
+        .id(element)
+        .source(json);
+}
 ```
 {{< /tab >}}
-{{< tab "scala, 5.x" >}}
+{{< tab "Scala" >}}
+Elasticsearch 6:
 ```scala
-import org.apache.flink.api.common.functions.RuntimeContext
+import org.apache.flink.api.connector.sink.SinkWriter
+import 
org.apache.flink.connector.elasticsearch.sink.{Elasticsearch6SinkBuilder, 
RequestIndexer}
 import org.apache.flink.streaming.api.datastream.DataStream
-import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
-import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
-import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink
-
+import org.apache.http.HttpHost
 import org.elasticsearch.action.index.IndexRequest
 import org.elasticsearch.client.Requests
 
-import java.net.InetAddress
-import java.net.InetSocketAddress
-import java.util.ArrayList
-import java.util.HashMap
-import java.util.List
-import java.util.Map
-
 val input: DataStream[String] = ...
 
-val config = new java.util.HashMap[String, String]
-config.put("cluster.name", "my-cluster-name")
-// This instructs the sink to emit after every element, otherwise they would 
be buffered
-config.put("bulk.flush.max.actions", "1")
+input.sinkTo(
+  new Elasticsearch6SinkBuilder[String]
+    .setBulkFlushMaxActions(1) // Instructs the sink to emit after every 
element, otherwise they would be buffered
+    .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+    .setEmitter((element: String, context: SinkWriter.Context, indexer: 
RequestIndexer) => 
+    indexer.add(createIndexRequest(element)))
+    .build())
 
-val transportAddresses = new java.util.ArrayList[InetSocketAddress]
-transportAddresses.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300))
-transportAddresses.add(new 
InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300))
+def createIndexRequest(element: (String)): IndexRequest = {
 
-input.addSink(new ElasticsearchSink(config, transportAddresses, new 
ElasticsearchSinkFunction[String] {
-  def createIndexRequest(element: String): IndexRequest = {
-    val json = new java.util.HashMap[String, String]
-    json.put("data", element)
-    
-    return Requests.indexRequest()
-            .index("my-index")
-            .type("my-type")
-            .source(json)
-  }
-}))
+  val json = Map(
+    "data" -> element.asInstanceOf[AnyRef]
+  )
+
+  
Requests.indexRequest.index("my-index").`type`("my-type").source(mapAsJavaMap(json))
+}
 ```
-{{< /tab >}}
-{{< tab "scala, Elasticsearch 6.x and above" >}}
+
+Elasticsearch 7:
 ```scala
-import org.apache.flink.api.common.functions.RuntimeContext
+import org.apache.flink.api.connector.sink.SinkWriter
+import 
org.apache.flink.connector.elasticsearch.sink.{Elasticsearch7SinkBuilder, 
RequestIndexer}
 import org.apache.flink.streaming.api.datastream.DataStream
-import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
-import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
-import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
-
 import org.apache.http.HttpHost
 import org.elasticsearch.action.index.IndexRequest
 import org.elasticsearch.client.Requests
 
-import java.util.ArrayList
-import java.util.List
-
 val input: DataStream[String] = ...
 
-val httpHosts = new java.util.ArrayList[HttpHost]
-httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"))
-httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"))
-
-val esSinkBuilder = new ElasticsearchSink.Builder[String](
-  httpHosts,
-  new ElasticsearchSinkFunction[String] {
-     def process(element: String, ctx: RuntimeContext, indexer: 
RequestIndexer) {
-          val json = new java.util.HashMap[String, String]
-          json.put("data", element)
-
-          val rqst: IndexRequest = Requests.indexRequest
-            .index("my-index")
-            .`type`("my-type")
-            .source(json)
-
-          indexer.add(rqst)
-     } 
-  }
-)
-
-// configuration for the bulk requests; this instructs the sink to emit after 
every element, otherwise they would be buffered
-esSinkBuilder.setBulkFlushMaxActions(1)
-
-// provide a RestClientFactory for custom configuration on the internally 
created REST client
-esSinkBuilder.setRestClientFactory(new RestClientFactory {
-  override def configureRestClientBuilder(restClientBuilder: 
RestClientBuilder): Unit = {
-       restClientBuilder.setDefaultHeaders(...)
-       restClientBuilder.setMaxRetryTimeoutMillis(...)
-       restClientBuilder.setPathPrefix(...)
-       restClientBuilder.setHttpClientConfigCallback(...)
-  }
-})
-
-// finally, build and add the sink to the job's pipeline
-input.addSink(esSinkBuilder.build)
+input.sinkTo(
+  new Elasticsearch7SinkBuilder[String]
+    .setBulkFlushMaxActions(1) // Instructs the sink to emit after every 
element, otherwise they would be buffered
+    .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+    .setEmitter((element: String, context: SinkWriter.Context, indexer: 
RequestIndexer) => 
+    indexer.add(createIndexRequest(element)))
+    .build())
+
+def createIndexRequest(element: (String)): IndexRequest = {
+
+  val json = Map(
+    "data" -> element.asInstanceOf[AnyRef]
+  )
+
+  
Requests.indexRequest.index("my-index").`type`("my-type").source(mapAsJavaMap(json))
+}
 ```
 {{< /tab >}}
 {{< /tabs >}}
 
-For Elasticsearch versions that still uses the now deprecated 
`TransportClient` to communicate
-with the Elasticsearch cluster (i.e., versions equal or below 5.x), note how a 
`Map` of `String`s
-is used to configure the `ElasticsearchSink`. This config map will be directly
-forwarded when creating the internally used `TransportClient`.
-The configuration keys are documented in the Elasticsearch documentation
-[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html).
-Especially important is the `cluster.name` parameter that must correspond to
-the name of your cluster.
-
-For Elasticsearch 6.x and above, internally, the `RestHighLevelClient` is used 
for cluster communication.
-By default, the connector uses the default configurations for the REST client. 
To have custom
-configuration for the REST client, users can provide a `RestClientFactory` 
implementation when 
-setting up the `ElasticsearchClient.Builder` that builds the sink.
-
-Also note that the example only demonstrates performing a single index
-request for each incoming element. Generally, the `ElasticsearchSinkFunction`
-can be used to perform multiple requests of different types (ex.,
+Note that the example only demonstrates performing a single index
+request for each incoming element. Generally, the `ElasticsearchEmitter`
+can be used to perform requests of different types (ex.,
 `DeleteRequest`, `UpdateRequest`, etc.). 
 
 Internally, each parallel instance of the Flink Elasticsearch Sink uses
@@ -309,7 +214,10 @@ flushes of the buffered actions in progress.
 
 ### Elasticsearch Sinks and Fault Tolerance
 
-With Flink’s checkpointing enabled, the Flink Elasticsearch Sink guarantees
+By default, the Flink Elasticsearch Sink will not provide any strong delivery 
guarantees.
+Users have the option to enable at-least-once semantics for the Elasticsearch 
sink.
+
+With Flink’s checkpointing enabled, the Flink Elasticsearch Sink can guarantee
 at-least-once delivery of action requests to Elasticsearch clusters. It does
 so by waiting for all pending action requests in the `BulkProcessor` at the
 time of checkpoints. This effectively assures that all requests before the
@@ -318,144 +226,159 @@ proceeding to process more records sent to the sink.
 
 More details on checkpoints and fault tolerance are in the [fault tolerance 
docs]({{< ref "docs/learn-flink/fault_tolerance" >}}).
 
-To use fault tolerant Elasticsearch Sinks, checkpointing of the topology needs 
to be enabled at the execution environment:
+To use fault tolerant Elasticsearch Sinks, at-least-once delivery has to be 
configured and checkpointing of the topology needs to be enabled at the 
execution environment:
 
 {{< tabs "d00d1e93-4844-40d7-b0ec-9ec37e73145e" >}}
 {{< tab "Java" >}}
+Elasticsearch 6:
+```java
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
+
+Elasticsearch6SinkBuilder sinkBuilder = new Elasticsearch6SinkBuilder<String>()
+    .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+    .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+    .setEmitter(
+    (element, context, indexer) -> 
+    indexer.add(createIndexRequest(element)));
+```
+
+Elasticsearch 7:
 ```java
 final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 env.enableCheckpointing(5000); // checkpoint every 5000 msecs
+
+Elasticsearch7SinkBuilder sinkBuilder = new Elasticsearch7SinkBuilder<String>()
+    .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+    .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+    .setEmitter(
+    (element, context, indexer) -> 
+    indexer.add(createIndexRequest(element)));
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
+Elasticsearch 6:
 ```scala
 val env = StreamExecutionEnvironment.getExecutionEnvironment()
 env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+
+val sinkBuilder = new Elasticsearch6SinkBuilder[String]
+  .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+  .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+  .setEmitter((element: String, context: SinkWriter.Context, indexer: 
RequestIndexer) =>
+  indexer.add(createIndexRequest(element)))
+```
+
+Elasticsearch 7:
+```scala
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+
+val sinkBuilder = new Elasticsearch7SinkBuilder[String]
+  .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+  .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+  .setEmitter((element: String, context: SinkWriter.Context, indexer: 
RequestIndexer) =>
+  indexer.add(createIndexRequest(element)))
 ```
 {{< /tab >}}
 {{< /tabs >}}
 
-<p style="border-radius: 5px; padding: 5px" class="bg-danger">
-<b>NOTE</b>: Users can disable flushing if they wish to do so, by calling
-<b>disableFlushOnCheckpoint()</b> on the created <b>ElasticsearchSink</b>. Be 
aware
-that this essentially means the sink will not provide any strong
-delivery guarantees anymore, even with checkpoint for the topology enabled.
-</p>
-
 ### Handling Failing Elasticsearch Requests
 
 Elasticsearch action requests may fail due to a variety of reasons, including
 temporarily saturated node queue capacity or malformed documents to be indexed.
-The Flink Elasticsearch Sink allows the user to specify how request
-failures are handled, by simply implementing an `ActionRequestFailureHandler` 
and
-providing it to the constructor.
+The Flink Elasticsearch Sink allows the user to retry requests by specifying a 
backoff-policy.
 
 Below is an example:
 
 {{< tabs "ddb958b3-5dd5-476e-b946-ace3335628b2" >}}
 {{< tab "Java" >}}
+Elasticsearch 6:
 ```java
 DataStream<String> input = ...;
 
-input.addSink(new ElasticsearchSink<>(
-    config, transportAddresses,
-    new ElasticsearchSinkFunction<String>() {...},
-    new ActionRequestFailureHandler() {
-        @Override
-        void onFailure(ActionRequest action,
-                Throwable failure,
-                int restStatusCode,
-                RequestIndexer indexer) throw Throwable {
-
-            if (ExceptionUtils.findThrowable(failure, 
EsRejectedExecutionException.class).isPresent()) {
-                // full queue; re-add document for indexing
-                indexer.add(action);
-            } else if (ExceptionUtils.findThrowable(failure, 
ElasticsearchParseException.class).isPresent()) {
-                // malformed document; simply drop request without failing sink
-            } else {
-                // for all other failures, fail the sink
-                // here the failure is simply rethrown, but users can also 
choose to throw custom exceptions
-                throw failure;
-            }
-        }
-}));
+input.sinkTo(
+    new Elasticsearch6SinkBuilder<String>()
+        .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+        .setEmitter(
+        (element, context, indexer) ->
+        indexer.add(createIndexRequest(element)))
+        // This enables an exponential backoff retry mechanism, with a maximum 
of 5 retries and an initial delay of 1000 milliseconds
+        .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000)
+        .build());
+```
+
+Elasticsearch 7:
+```java
+DataStream<String> input = ...;
+
+input.sinkTo(
+    new Elasticsearch7SinkBuilder<String>()
+        .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+        .setEmitter(
+        (element, context, indexer) ->
+        indexer.add(createIndexRequest(element)))
+        // This enables an exponential backoff retry mechanism, with a maximum 
of 5 retries and an initial delay of 1000 milliseconds
+        .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000)
+        .build());
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
+Elasticsearch 6:
 ```scala
 val input: DataStream[String] = ...
 
-input.addSink(new ElasticsearchSink(
-    config, transportAddresses,
-    new ElasticsearchSinkFunction[String] {...},
-    new ActionRequestFailureHandler {
-        @throws(classOf[Throwable])
-        override def onFailure(ActionRequest action,
-                Throwable failure,
-                int restStatusCode,
-                RequestIndexer indexer) {
-
-            if (ExceptionUtils.findThrowable(failure, 
EsRejectedExecutionException.class).isPresent()) {
-                // full queue; re-add document for indexing
-                indexer.add(action)
-            } else if (ExceptionUtils.findThrowable(failure, 
ElasticsearchParseException.class).isPresent()) {
-                // malformed document; simply drop request without failing sink
-            } else {
-                // for all other failures, fail the sink
-                // here the failure is simply rethrown, but users can also 
choose to throw custom exceptions
-                throw failure
-            }
-        }
-}))
+input.sinkTo(
+  new Elasticsearch6SinkBuilder[String]
+    .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+    .setEmitter((element: String, context: SinkWriter.Context, indexer: 
RequestIndexer) => 
+    indexer.add(createIndexRequest(element)))
+    // This enables an exponential backoff retry mechanism, with a maximum of 
5 retries and an initial delay of 1000 milliseconds
+    .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000)
+    .build())
 ```
-{{< /tab >}}
-{{< /tabs >}}
 
-The above example will let the sink re-add requests that failed due to
-queue capacity saturation and drop requests with malformed documents, without
-failing the sink. For all other failures, the sink will fail. If a 
`ActionRequestFailureHandler`
-is not provided to the constructor, the sink will fail for any kind of error.
+Elasticsearch 7:
+```scala
+val input: DataStream[String] = ...
 
-Note that `onFailure` is called for failures that still occur only after the
-`BulkProcessor` internally finishes all backoff retry attempts.
-By default, the `BulkProcessor` retries to a maximum of 8 attempts with
-an exponential backoff. For more information on the behaviour of the
-internal `BulkProcessor` and how to configure it, please see the following 
section.
+input.sinkTo(
+  new Elasticsearch7SinkBuilder[String]
+    .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+    .setEmitter((element: String, context: SinkWriter.Context, indexer: 
RequestIndexer) => 
+    indexer.add(createIndexRequest(element)))
+    // This enables an exponential backoff retry mechanism, with a maximum of 
5 retries and an initial delay of 1000 milliseconds
+    .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000)
+    .build())
+```
+{{< /tab >}}
+{{< /tabs >}}
 
-By default, if a failure handler is not provided, the sink uses a
-`NoOpFailureHandler` that simply fails for all kinds of exceptions. The
-connector also provides a `RetryRejectedExecutionFailureHandler` implementation
-that always re-add requests that have failed due to queue capacity saturation.
+The above example will let the sink re-add requests that failed due to 
resource constrains (e.g.
+queue capacity saturation). For all other failures, such as malformed 
documents, the sink will fail. 
+If no BulkFlushBackoffStrategy (or `FlushBackoffType.NONE`) is configured, the 
sink will fail for any kind of error.
 
 <p style="border-radius: 5px; padding: 5px" class="bg-danger">
 <b>IMPORTANT</b>: Re-adding requests back to the internal <b>BulkProcessor</b>
 on failures will lead to longer checkpoints, as the sink will also
 need to wait for the re-added requests to be flushed when checkpointing.
-For example, when using <b>RetryRejectedExecutionFailureHandler</b>, 
checkpoints
+For example, when using <b>FlushBackoffType.EXPONENTIAL</b>, checkpoints
 will need to wait until Elasticsearch node queues have enough capacity for
-all the pending requests. This also means that if re-added requests never
-succeed, the checkpoint will never finish.
+all the pending requests, or until the maximum number of retries has been 
reached.
 </p>
 
 ### Configuring the Internal Bulk Processor
 
 The internal `BulkProcessor` can be further configured for its behaviour
-on how buffered action requests are flushed, by setting the following values in
-the provided `Map<String, String>`:
+on how buffered action requests are flushed, by using the following methods of 
the Elasticsearch6SinkBuilder:
 
- * **bulk.flush.max.actions**: Maximum amount of actions to buffer before 
flushing.
- * **bulk.flush.max.size.mb**: Maximum size of data (in megabytes) to buffer 
before flushing.
- * **bulk.flush.interval.ms**: Interval at which to flush regardless of the 
amount or size of buffered actions.
- 
-For versions 2.x and above, configuring how temporary request errors are
-retried is also supported:
+* **setBulkFlushMaxActions(int numMaxActions)** Maximum amount of actions to 
buffer before flushing.
+* **setBulkFlushMaxSizeMb(int maxSizeMb)**: Maximum size of data (in 
megabytes) to buffer before flushing.
+* **setBulkFlushInterval(long intervalMillis)**: Interval at which to flush 
regardless of the amount or size of buffered actions.
  
- * **bulk.flush.backoff.enable**: Whether or not to perform retries with 
backoff delay for a flush
- if one or more of its actions failed due to a temporary 
`EsRejectedExecutionException`.
- * **bulk.flush.backoff.type**: The type of backoff delay, either `CONSTANT` 
or `EXPONENTIAL`
- * **bulk.flush.backoff.delay**: The amount of delay for backoff. For constant 
backoff, this
- is simply the delay between each retry. For exponential backoff, this is the 
initial base delay.
- * **bulk.flush.backoff.retries**: The amount of backoff retries to attempt.
+Configuring how temporary request errors are retried is also supported:
+ * **setBulkFlushBackoffStrategy(FlushBackoffType flushBackoffType, int 
maxRetries, long delayMillis)**: The type of backoff delay, either `CONSTANT` 
or `EXPONENTIAL`, the amount of backoff retries to attempt, the amount of delay 
for backoff. For constant backoff, this
+   is simply the delay between each retry. For exponential backoff, this is 
the initial base delay.
 
 More information about Elasticsearch can be found [here](https://elastic.co).
 
diff --git a/docs/content/docs/connectors/table/elasticsearch.md 
b/docs/content/docs/connectors/table/elasticsearch.md
index 64bc68a..8639858 100644
--- a/docs/content/docs/connectors/table/elasticsearch.md
+++ b/docs/content/docs/connectors/table/elasticsearch.md
@@ -129,28 +129,11 @@ Connector Options
       <td>Password used to connect to Elasticsearch instance. If 
<code>username</code> is configured, this option must be configured with 
non-empty string as well.</td>
     </tr>
     <tr>
-      <td><h5>failure-handler</h5></td>
+      <td><h5>sink.delivery-guarantee</h5></td>
       <td>optional</td>
-      <td style="word-wrap: break-word;">fail</td>
+      <td style="word-wrap: break-word;">NONE</td>
       <td>String</td>
-      <td>Failure handling strategy in case a request to Elasticsearch fails. 
Valid strategies are:
-      <ul>
-        <li><code>fail</code>: throws an exception if a request fails and thus 
causes a job failure.</li>
-        <li><code>ignore</code>: ignores failures and drops the request.</li>
-        <li><code>retry-rejected</code>: re-adds requests that have failed due 
to queue capacity saturation.</li>
-        <li>custom class name: for failure handling with a 
ActionRequestFailureHandler subclass.</li>
-      </ul>
-      </td>
-    </tr>
-    <tr>
-      <td><h5>sink.flush-on-checkpoint</h5></td>
-      <td>optional</td>
-      <td style="word-wrap: break-word;">true</td>
-      <td>Boolean</td>
-      <td>Flush on checkpoint or not. When disabled, a sink will not wait for 
all pending action requests
-       to be acknowledged by Elasticsearch on checkpoints. Thus, a sink does 
NOT provide any strong
-       guarantees for at-least-once delivery of action requests.
-      </td>
+      <td>Optional delivery guarantee when committing. Valid values are 
<code>NONE</code> or <code>AT_LEAST_ONCE</code>.</td>
     </tr>
     <tr>
       <td><h5>sink.bulk-flush.max-actions</h5></td>
@@ -183,11 +166,11 @@ Connector Options
     <tr>
       <td><h5>sink.bulk-flush.backoff.strategy</h5></td>
       <td>optional</td>
-      <td style="word-wrap: break-word;">DISABLED</td>
+      <td style="word-wrap: break-word;">NONE</td>
       <td>String</td>
       <td>Specify how to perform retries if any flush actions failed due to a 
temporary request error. Valid strategies are:
       <ul>
-        <li><code>DISABLED</code>: no retry performed, i.e. fail after the 
first request error.</li>
+        <li><code>NONE</code>: no retry performed, i.e. fail after the first 
request error.</li>
         <li><code>CONSTANT</code>: wait for backoff delay between retries.</li>
         <li><code>EXPONENTIAL</code>: initially wait for backoff delay and 
increase exponentially between retries.</li>
       </ul>
@@ -196,23 +179,16 @@ Connector Options
     <tr>
       <td><h5>sink.bulk-flush.backoff.max-retries</h5></td>
       <td>optional</td>
-      <td style="word-wrap: break-word;">8</td>
+      <td style="word-wrap: break-word;">(none)</td>
       <td>Integer</td>
       <td>Maximum number of backoff retries.</td>
     </tr>
     <tr>
       <td><h5>sink.bulk-flush.backoff.delay</h5></td>
       <td>optional</td>
-      <td style="word-wrap: break-word;">50ms</td>
-      <td>Duration</td>
-      <td>Delay between each backoff attempt. For <code>CONSTANT</code> 
backoff, this is simply the delay between each retry. For 
<code>EXPONENTIAL</code> backoff, this is the initial base delay.</td>
-    </tr>
-    <tr>
-      <td><h5>connection.max-retry-timeout</h5></td>
-      <td>optional</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>Duration</td>
-      <td>Maximum timeout between retries.</td>
+      <td>Delay between each backoff attempt. For <code>CONSTANT</code> 
backoff, this is simply the delay between each retry. For 
<code>EXPONENTIAL</code> backoff, this is the initial base delay.</td>
     </tr>
     <tr>
       <td><h5>connection.path-prefix</h5></td>
@@ -238,20 +214,20 @@ Features
 
 ### Key Handling
 
-Elasticsearch sink can work in either upsert mode or append mode, it depends 
on whether primary key is defined.
-If primary key is defined, Elasticsearch sink works in upsert mode which can 
consume queries containing UPDATE/DELETE messages.
-If primary key is not defined, Elasticsearch sink works in append mode which 
can only consume queries containing INSERT only messages.
+The Elasticsearch sink can work in either upsert mode or append mode, 
depending on whether a primary key is defined.
+If a primary key is defined, the Elasticsearch sink works in upsert mode which 
can consume queries containing UPDATE/DELETE messages.
+If a primary key is not defined, the Elasticsearch sink works in append mode 
which can only consume queries containing INSERT only messages.
 
-In Elasticsearch connector, the primary key is used to calculate the 
Elasticsearch document id, which is a string of up to 512 bytes. It cannot have 
whitespaces.
+In the Elasticsearch connector, the primary key is used to calculate the 
Elasticsearch document id, which is a string of up to 512 bytes. It cannot have 
whitespaces.
 The Elasticsearch connector generates a document ID string for every row by 
concatenating all primary key fields in the order defined in the DDL using a 
key delimiter specified by `document-id.key-delimiter`.
-Certain types are not allowed as primary key field as they do not have a good 
string representation, e.g. `BYTES`, `ROW`, `ARRAY`, `MAP`, etc.
+Certain types are not allowed as a primary key field as they do not have a 
good string representation, e.g. `BYTES`, `ROW`, `ARRAY`, `MAP`, etc.
 If no primary key is specified, Elasticsearch will generate a document id 
automatically.
 
-See [CREATE TABLE DDL]({{< ref "docs/dev/table/sql/create" >}}#create-table) 
for more details about PRIMARY KEY syntax.
+See [CREATE TABLE DDL]({{< ref "docs/dev/table/sql/create" >}}#create-table) 
for more details about the PRIMARY KEY syntax.
 
 ### Dynamic Index
 
-Elasticsearch sink supports both static index and dynamic index.
+The Elasticsearch sink supports both static index and dynamic index.
 
 If you want to have a static index, the `index` option value should be a plain 
string, e.g. `'myusers'`, all the records will be consistently written into 
"myusers" index.
 

Reply via email to