This is an automated email from the ASF dual-hosted git repository.
fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 432244c [FLINK-24326][docs][connectors/elasticsearch] Update
elasticsearch sink pages with new unified sink API (FLIP-143) implementation
432244c is described below
commit 432244c0c6aabff7d19cb6f694bb5b9040a5c26a
Author: Alexander Preuß <[email protected]>
AuthorDate: Fri Nov 26 14:30:55 2021 +0100
[FLINK-24326][docs][connectors/elasticsearch] Update elasticsearch sink
pages with new unified sink API (FLIP-143) implementation
---
.../docs/connectors/datastream/elasticsearch.md | 473 +++++++++------------
.../content/docs/connectors/table/elasticsearch.md | 52 +--
2 files changed, 212 insertions(+), 313 deletions(-)
diff --git a/docs/content/docs/connectors/datastream/elasticsearch.md
b/docs/content/docs/connectors/datastream/elasticsearch.md
index d066fd1..8fd6ab6 100644
--- a/docs/content/docs/connectors/datastream/elasticsearch.md
+++ b/docs/content/docs/connectors/datastream/elasticsearch.md
@@ -43,15 +43,11 @@ of the Elasticsearch installation:
</thead>
<tbody>
<tr>
- <td>5.x</td>
- <td>{{< artifact flink-connector-elasticsearch5 >}}</td>
- </tr>
- <tr>
- <td>6.x</td>
+ <td><= 6.3.1</td>
<td>{{< artifact flink-connector-elasticsearch6 >}}</td>
</tr>
<tr>
- <td>7 and later versions</td>
+ <td><= 7.5.1</td>
<td>{{< artifact flink-connector-elasticsearch7 >}}</td>
</tr>
</tbody>
@@ -65,240 +61,149 @@ about how to package the program with the libraries for
cluster execution.
Instructions for setting up an Elasticsearch cluster can be found
[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
-Make sure to set and remember a cluster name. This must be set when
-creating an `ElasticsearchSink` for requesting document actions against your
cluster.
## Elasticsearch Sink
-The `ElasticsearchSink` uses a `TransportClient` (before 6.x) or
`RestHighLevelClient` (starting with 6.x) to communicate with an
-Elasticsearch cluster.
-
The example below shows how to configure and create a sink:
{{< tabs "51732edd-4218-470e-adad-b1ebb4021ae4" >}}
-{{< tab "java, 5.x" >}}
+{{< tab "Java" >}}
+Elasticsearch 6:
```java
-import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.connector.elasticsearch.sink.Elasticsearch6SinkBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;
-import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
-import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
-import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;
+import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
DataStream<String> input = ...;
-Map<String, String> config = new HashMap<>();
-config.put("cluster.name", "my-cluster-name");
-// This instructs the sink to emit after every element, otherwise they would
be buffered
-config.put("bulk.flush.max.actions", "1");
-
-List<InetSocketAddress> transportAddresses = new ArrayList<>();
-transportAddresses.add(new
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-transportAddresses.add(new
InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));
-
-input.addSink(new ElasticsearchSink<>(config, transportAddresses, new
ElasticsearchSinkFunction<String>() {
- public IndexRequest createIndexRequest(String element) {
- Map<String, String> json = new HashMap<>();
- json.put("data", element);
-
- return Requests.indexRequest()
- .index("my-index")
- .type("my-type")
- .source(json);
- }
-
- @Override
- public void process(String element, RuntimeContext ctx, RequestIndexer
indexer) {
- indexer.add(createIndexRequest(element));
- }
-}));```
-{{< /tab >}}
-{{< tab "java, Elasticsearch 6.x and above" >}}
+input.sinkTo(
+ new Elasticsearch6SinkBuilder<String>()
+ .setBulkFlushMaxActions(1) // Instructs the sink to emit after every
element, otherwise they would be buffered
+ .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+ .setEmitter(
+ (element, context, indexer) ->
+ indexer.add(createIndexRequest(element)))
+ .build());
+
+private static IndexRequest createIndexRequest(String element) {
+ Map<String, Object> json = new HashMap<>();
+ json.put("data", element);
+
+ return Requests.indexRequest()
+ .index("my-index")
+ .type("my-type")
+ .id(element)
+ .source(json);
+}
+```
+
+Elasticsearch 7:
```java
-import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;
-import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
-import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
-import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
DataStream<String> input = ...;
-List<HttpHost> httpHosts = new ArrayList<>();
-httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
-httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));
-
-// use a ElasticsearchSink.Builder to create an ElasticsearchSink
-ElasticsearchSink.Builder<String> esSinkBuilder = new
ElasticsearchSink.Builder<>(
- httpHosts,
- new ElasticsearchSinkFunction<String>() {
- public IndexRequest createIndexRequest(String element) {
- Map<String, String> json = new HashMap<>();
- json.put("data", element);
-
- return Requests.indexRequest()
- .index("my-index")
- .type("my-type")
- .source(json);
- }
-
- @Override
- public void process(String element, RuntimeContext ctx, RequestIndexer
indexer) {
- indexer.add(createIndexRequest(element));
- }
- }
-);
-
-// configuration for the bulk requests; this instructs the sink to emit after
every element, otherwise they would be buffered
-esSinkBuilder.setBulkFlushMaxActions(1);
-
-// provide a RestClientFactory for custom configuration on the internally
created REST client
-esSinkBuilder.setRestClientFactory(
- restClientBuilder -> {
- restClientBuilder.setDefaultHeaders(...)
- restClientBuilder.setMaxRetryTimeoutMillis(...)
- restClientBuilder.setPathPrefix(...)
- restClientBuilder.setHttpClientConfigCallback(...)
- }
-);
-
-// finally, build and add the sink to the job's pipeline
-input.addSink(esSinkBuilder.build());
+input.sinkTo(
+ new Elasticsearch7SinkBuilder<String>()
+ .setBulkFlushMaxActions(1) // Instructs the sink to emit after every
element, otherwise they would be buffered
+ .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+ .setEmitter(
+ (element, context, indexer) ->
+ indexer.add(createIndexRequest(element)))
+ .build());
+
+private static IndexRequest createIndexRequest(String element) {
+ Map<String, Object> json = new HashMap<>();
+ json.put("data", element);
+
+ return Requests.indexRequest()
+ .index("my-index")
+ .type("my-type")
+ .id(element)
+ .source(json);
+}
```
{{< /tab >}}
-{{< tab "scala, 5.x" >}}
+{{< tab "Scala" >}}
+Elasticsearch 6:
```scala
-import org.apache.flink.api.common.functions.RuntimeContext
+import org.apache.flink.api.connector.sink.SinkWriter
+import
org.apache.flink.connector.elasticsearch.sink.{Elasticsearch6SinkBuilder,
RequestIndexer}
import org.apache.flink.streaming.api.datastream.DataStream
-import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
-import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
-import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink
-
+import org.apache.http.HttpHost
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requests
-import java.net.InetAddress
-import java.net.InetSocketAddress
-import java.util.ArrayList
-import java.util.HashMap
-import java.util.List
-import java.util.Map
-
val input: DataStream[String] = ...
-val config = new java.util.HashMap[String, String]
-config.put("cluster.name", "my-cluster-name")
-// This instructs the sink to emit after every element, otherwise they would
be buffered
-config.put("bulk.flush.max.actions", "1")
+input.sinkTo(
+ new Elasticsearch6SinkBuilder[String]
+ .setBulkFlushMaxActions(1) // Instructs the sink to emit after every
element, otherwise they would be buffered
+ .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+ .setEmitter((element: String, context: SinkWriter.Context, indexer:
RequestIndexer) =>
+ indexer.add(createIndexRequest(element)))
+ .build())
-val transportAddresses = new java.util.ArrayList[InetSocketAddress]
-transportAddresses.add(new
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300))
-transportAddresses.add(new
InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300))
+def createIndexRequest(element: (String)): IndexRequest = {
-input.addSink(new ElasticsearchSink(config, transportAddresses, new
ElasticsearchSinkFunction[String] {
- def createIndexRequest(element: String): IndexRequest = {
- val json = new java.util.HashMap[String, String]
- json.put("data", element)
-
- return Requests.indexRequest()
- .index("my-index")
- .type("my-type")
- .source(json)
- }
-}))
+ val json = Map(
+ "data" -> element.asInstanceOf[AnyRef]
+ )
+
+
Requests.indexRequest.index("my-index").`type`("my-type").source(mapAsJavaMap(json))
+}
```
-{{< /tab >}}
-{{< tab "scala, Elasticsearch 6.x and above" >}}
+
+Elasticsearch 7:
```scala
-import org.apache.flink.api.common.functions.RuntimeContext
+import org.apache.flink.api.connector.sink.SinkWriter
+import
org.apache.flink.connector.elasticsearch.sink.{Elasticsearch7SinkBuilder,
RequestIndexer}
import org.apache.flink.streaming.api.datastream.DataStream
-import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
-import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
-import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
-
import org.apache.http.HttpHost
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requests
-import java.util.ArrayList
-import java.util.List
-
val input: DataStream[String] = ...
-val httpHosts = new java.util.ArrayList[HttpHost]
-httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"))
-httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"))
-
-val esSinkBuilder = new ElasticsearchSink.Builder[String](
- httpHosts,
- new ElasticsearchSinkFunction[String] {
- def process(element: String, ctx: RuntimeContext, indexer:
RequestIndexer) {
- val json = new java.util.HashMap[String, String]
- json.put("data", element)
-
- val rqst: IndexRequest = Requests.indexRequest
- .index("my-index")
- .`type`("my-type")
- .source(json)
-
- indexer.add(rqst)
- }
- }
-)
-
-// configuration for the bulk requests; this instructs the sink to emit after
every element, otherwise they would be buffered
-esSinkBuilder.setBulkFlushMaxActions(1)
-
-// provide a RestClientFactory for custom configuration on the internally
created REST client
-esSinkBuilder.setRestClientFactory(new RestClientFactory {
- override def configureRestClientBuilder(restClientBuilder:
RestClientBuilder): Unit = {
- restClientBuilder.setDefaultHeaders(...)
- restClientBuilder.setMaxRetryTimeoutMillis(...)
- restClientBuilder.setPathPrefix(...)
- restClientBuilder.setHttpClientConfigCallback(...)
- }
-})
-
-// finally, build and add the sink to the job's pipeline
-input.addSink(esSinkBuilder.build)
+input.sinkTo(
+ new Elasticsearch7SinkBuilder[String]
+ .setBulkFlushMaxActions(1) // Instructs the sink to emit after every
element, otherwise they would be buffered
+ .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+ .setEmitter((element: String, context: SinkWriter.Context, indexer:
RequestIndexer) =>
+ indexer.add(createIndexRequest(element)))
+ .build())
+
+def createIndexRequest(element: (String)): IndexRequest = {
+
+ val json = Map(
+ "data" -> element.asInstanceOf[AnyRef]
+ )
+
+
Requests.indexRequest.index("my-index").`type`("my-type").source(mapAsJavaMap(json))
+}
```
{{< /tab >}}
{{< /tabs >}}
-For Elasticsearch versions that still uses the now deprecated
`TransportClient` to communicate
-with the Elasticsearch cluster (i.e., versions equal or below 5.x), note how a
`Map` of `String`s
-is used to configure the `ElasticsearchSink`. This config map will be directly
-forwarded when creating the internally used `TransportClient`.
-The configuration keys are documented in the Elasticsearch documentation
-[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html).
-Especially important is the `cluster.name` parameter that must correspond to
-the name of your cluster.
-
-For Elasticsearch 6.x and above, internally, the `RestHighLevelClient` is used
for cluster communication.
-By default, the connector uses the default configurations for the REST client.
To have custom
-configuration for the REST client, users can provide a `RestClientFactory`
implementation when
-setting up the `ElasticsearchClient.Builder` that builds the sink.
-
-Also note that the example only demonstrates performing a single index
-request for each incoming element. Generally, the `ElasticsearchSinkFunction`
-can be used to perform multiple requests of different types (ex.,
+Note that the example only demonstrates performing a single index
+request for each incoming element. Generally, the `ElasticsearchEmitter`
+can be used to perform requests of different types (ex.,
`DeleteRequest`, `UpdateRequest`, etc.).
Internally, each parallel instance of the Flink Elasticsearch Sink uses
@@ -309,7 +214,10 @@ flushes of the buffered actions in progress.
### Elasticsearch Sinks and Fault Tolerance
-With Flink’s checkpointing enabled, the Flink Elasticsearch Sink guarantees
+By default, the Flink Elasticsearch Sink will not provide any strong delivery
guarantees.
+Users have the option to enable at-least-once semantics for the Elasticsearch
sink.
+
+With Flink’s checkpointing enabled, the Flink Elasticsearch Sink can guarantee
at-least-once delivery of action requests to Elasticsearch clusters. It does
so by waiting for all pending action requests in the `BulkProcessor` at the
time of checkpoints. This effectively assures that all requests before the
@@ -318,144 +226,159 @@ proceeding to process more records sent to the sink.
More details on checkpoints and fault tolerance are in the [fault tolerance
docs]({{< ref "docs/learn-flink/fault_tolerance" >}}).
-To use fault tolerant Elasticsearch Sinks, checkpointing of the topology needs
to be enabled at the execution environment:
+To use fault tolerant Elasticsearch Sinks, at-least-once delivery has to be
configured and checkpointing of the topology needs to be enabled at the
execution environment:
{{< tabs "d00d1e93-4844-40d7-b0ec-9ec37e73145e" >}}
{{< tab "Java" >}}
+Elasticsearch 6:
+```java
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
+
+Elasticsearch6SinkBuilder sinkBuilder = new Elasticsearch6SinkBuilder<String>()
+ .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+ .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+ .setEmitter(
+ (element, context, indexer) ->
+ indexer.add(createIndexRequest(element)));
+```
+
+Elasticsearch 7:
```java
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // checkpoint every 5000 msecs
+
+Elasticsearch7SinkBuilder sinkBuilder = new Elasticsearch7SinkBuilder<String>()
+ .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+ .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+ .setEmitter(
+ (element, context, indexer) ->
+ indexer.add(createIndexRequest(element)));
```
{{< /tab >}}
{{< tab "Scala" >}}
+Elasticsearch 6:
```scala
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+
+val sinkBuilder = new Elasticsearch6SinkBuilder[String]
+ .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+ .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+ .setEmitter((element: String, context: SinkWriter.Context, indexer:
RequestIndexer) =>
+ indexer.add(createIndexRequest(element)))
+```
+
+Elasticsearch 7:
+```scala
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+
+val sinkBuilder = new Elasticsearch7SinkBuilder[String]
+ .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+ .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+ .setEmitter((element: String, context: SinkWriter.Context, indexer:
RequestIndexer) =>
+ indexer.add(createIndexRequest(element)))
```
{{< /tab >}}
{{< /tabs >}}
-<p style="border-radius: 5px; padding: 5px" class="bg-danger">
-<b>NOTE</b>: Users can disable flushing if they wish to do so, by calling
-<b>disableFlushOnCheckpoint()</b> on the created <b>ElasticsearchSink</b>. Be
aware
-that this essentially means the sink will not provide any strong
-delivery guarantees anymore, even with checkpoint for the topology enabled.
-</p>
-
### Handling Failing Elasticsearch Requests
Elasticsearch action requests may fail due to a variety of reasons, including
temporarily saturated node queue capacity or malformed documents to be indexed.
-The Flink Elasticsearch Sink allows the user to specify how request
-failures are handled, by simply implementing an `ActionRequestFailureHandler`
and
-providing it to the constructor.
+The Flink Elasticsearch Sink allows the user to retry requests by specifying a
backoff-policy.
Below is an example:
{{< tabs "ddb958b3-5dd5-476e-b946-ace3335628b2" >}}
{{< tab "Java" >}}
+Elasticsearch 6:
```java
DataStream<String> input = ...;
-input.addSink(new ElasticsearchSink<>(
- config, transportAddresses,
- new ElasticsearchSinkFunction<String>() {...},
- new ActionRequestFailureHandler() {
- @Override
- void onFailure(ActionRequest action,
- Throwable failure,
- int restStatusCode,
- RequestIndexer indexer) throw Throwable {
-
- if (ExceptionUtils.findThrowable(failure,
EsRejectedExecutionException.class).isPresent()) {
- // full queue; re-add document for indexing
- indexer.add(action);
- } else if (ExceptionUtils.findThrowable(failure,
ElasticsearchParseException.class).isPresent()) {
- // malformed document; simply drop request without failing sink
- } else {
- // for all other failures, fail the sink
- // here the failure is simply rethrown, but users can also
choose to throw custom exceptions
- throw failure;
- }
- }
-}));
+input.sinkTo(
+ new Elasticsearch6SinkBuilder<String>()
+ .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+ .setEmitter(
+ (element, context, indexer) ->
+ indexer.add(createIndexRequest(element)))
+ // This enables an exponential backoff retry mechanism, with a maximum
of 5 retries and an initial delay of 1000 milliseconds
+ .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000)
+ .build());
+```
+
+Elasticsearch 7:
+```java
+DataStream<String> input = ...;
+
+input.sinkTo(
+ new Elasticsearch7SinkBuilder<String>()
+ .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+ .setEmitter(
+ (element, context, indexer) ->
+ indexer.add(createIndexRequest(element)))
+ // This enables an exponential backoff retry mechanism, with a maximum
of 5 retries and an initial delay of 1000 milliseconds
+ .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000)
+ .build());
```
{{< /tab >}}
{{< tab "Scala" >}}
+Elasticsearch 6:
```scala
val input: DataStream[String] = ...
-input.addSink(new ElasticsearchSink(
- config, transportAddresses,
- new ElasticsearchSinkFunction[String] {...},
- new ActionRequestFailureHandler {
- @throws(classOf[Throwable])
- override def onFailure(ActionRequest action,
- Throwable failure,
- int restStatusCode,
- RequestIndexer indexer) {
-
- if (ExceptionUtils.findThrowable(failure,
EsRejectedExecutionException.class).isPresent()) {
- // full queue; re-add document for indexing
- indexer.add(action)
- } else if (ExceptionUtils.findThrowable(failure,
ElasticsearchParseException.class).isPresent()) {
- // malformed document; simply drop request without failing sink
- } else {
- // for all other failures, fail the sink
- // here the failure is simply rethrown, but users can also
choose to throw custom exceptions
- throw failure
- }
- }
-}))
+input.sinkTo(
+ new Elasticsearch6SinkBuilder[String]
+ .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+ .setEmitter((element: String, context: SinkWriter.Context, indexer:
RequestIndexer) =>
+ indexer.add(createIndexRequest(element)))
+ // This enables an exponential backoff retry mechanism, with a maximum of
5 retries and an initial delay of 1000 milliseconds
+ .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000)
+ .build())
```
-{{< /tab >}}
-{{< /tabs >}}
-The above example will let the sink re-add requests that failed due to
-queue capacity saturation and drop requests with malformed documents, without
-failing the sink. For all other failures, the sink will fail. If a
`ActionRequestFailureHandler`
-is not provided to the constructor, the sink will fail for any kind of error.
+Elasticsearch 7:
+```scala
+val input: DataStream[String] = ...
-Note that `onFailure` is called for failures that still occur only after the
-`BulkProcessor` internally finishes all backoff retry attempts.
-By default, the `BulkProcessor` retries to a maximum of 8 attempts with
-an exponential backoff. For more information on the behaviour of the
-internal `BulkProcessor` and how to configure it, please see the following
section.
+input.sinkTo(
+ new Elasticsearch7SinkBuilder[String]
+ .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+ .setEmitter((element: String, context: SinkWriter.Context, indexer:
RequestIndexer) =>
+ indexer.add(createIndexRequest(element)))
+ // This enables an exponential backoff retry mechanism, with a maximum of
5 retries and an initial delay of 1000 milliseconds
+ .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000)
+ .build())
+```
+{{< /tab >}}
+{{< /tabs >}}
-By default, if a failure handler is not provided, the sink uses a
-`NoOpFailureHandler` that simply fails for all kinds of exceptions. The
-connector also provides a `RetryRejectedExecutionFailureHandler` implementation
-that always re-add requests that have failed due to queue capacity saturation.
+The above example will let the sink re-add requests that failed due to
resource constrains (e.g.
+queue capacity saturation). For all other failures, such as malformed
documents, the sink will fail.
+If no BulkFlushBackoffStrategy (or `FlushBackoffType.NONE`) is configured, the
sink will fail for any kind of error.
<p style="border-radius: 5px; padding: 5px" class="bg-danger">
<b>IMPORTANT</b>: Re-adding requests back to the internal <b>BulkProcessor</b>
on failures will lead to longer checkpoints, as the sink will also
need to wait for the re-added requests to be flushed when checkpointing.
-For example, when using <b>RetryRejectedExecutionFailureHandler</b>,
checkpoints
+For example, when using <b>FlushBackoffType.EXPONENTIAL</b>, checkpoints
will need to wait until Elasticsearch node queues have enough capacity for
-all the pending requests. This also means that if re-added requests never
-succeed, the checkpoint will never finish.
+all the pending requests, or until the maximum number of retries has been
reached.
</p>
### Configuring the Internal Bulk Processor
The internal `BulkProcessor` can be further configured for its behaviour
-on how buffered action requests are flushed, by setting the following values in
-the provided `Map<String, String>`:
+on how buffered action requests are flushed, by using the following methods of
the Elasticsearch6SinkBuilder:
- * **bulk.flush.max.actions**: Maximum amount of actions to buffer before
flushing.
- * **bulk.flush.max.size.mb**: Maximum size of data (in megabytes) to buffer
before flushing.
- * **bulk.flush.interval.ms**: Interval at which to flush regardless of the
amount or size of buffered actions.
-
-For versions 2.x and above, configuring how temporary request errors are
-retried is also supported:
+* **setBulkFlushMaxActions(int numMaxActions)** Maximum amount of actions to
buffer before flushing.
+* **setBulkFlushMaxSizeMb(int maxSizeMb)**: Maximum size of data (in
megabytes) to buffer before flushing.
+* **setBulkFlushInterval(long intervalMillis)**: Interval at which to flush
regardless of the amount or size of buffered actions.
- * **bulk.flush.backoff.enable**: Whether or not to perform retries with
backoff delay for a flush
- if one or more of its actions failed due to a temporary
`EsRejectedExecutionException`.
- * **bulk.flush.backoff.type**: The type of backoff delay, either `CONSTANT`
or `EXPONENTIAL`
- * **bulk.flush.backoff.delay**: The amount of delay for backoff. For constant
backoff, this
- is simply the delay between each retry. For exponential backoff, this is the
initial base delay.
- * **bulk.flush.backoff.retries**: The amount of backoff retries to attempt.
+Configuring how temporary request errors are retried is also supported:
+ * **setBulkFlushBackoffStrategy(FlushBackoffType flushBackoffType, int
maxRetries, long delayMillis)**: The type of backoff delay, either `CONSTANT`
or `EXPONENTIAL`, the amount of backoff retries to attempt, the amount of delay
for backoff. For constant backoff, this
+ is simply the delay between each retry. For exponential backoff, this is
the initial base delay.
More information about Elasticsearch can be found [here](https://elastic.co).
diff --git a/docs/content/docs/connectors/table/elasticsearch.md
b/docs/content/docs/connectors/table/elasticsearch.md
index 64bc68a..8639858 100644
--- a/docs/content/docs/connectors/table/elasticsearch.md
+++ b/docs/content/docs/connectors/table/elasticsearch.md
@@ -129,28 +129,11 @@ Connector Options
<td>Password used to connect to Elasticsearch instance. If
<code>username</code> is configured, this option must be configured with
non-empty string as well.</td>
</tr>
<tr>
- <td><h5>failure-handler</h5></td>
+ <td><h5>sink.delivery-guarantee</h5></td>
<td>optional</td>
- <td style="word-wrap: break-word;">fail</td>
+ <td style="word-wrap: break-word;">NONE</td>
<td>String</td>
- <td>Failure handling strategy in case a request to Elasticsearch fails.
Valid strategies are:
- <ul>
- <li><code>fail</code>: throws an exception if a request fails and thus
causes a job failure.</li>
- <li><code>ignore</code>: ignores failures and drops the request.</li>
- <li><code>retry-rejected</code>: re-adds requests that have failed due
to queue capacity saturation.</li>
- <li>custom class name: for failure handling with a
ActionRequestFailureHandler subclass.</li>
- </ul>
- </td>
- </tr>
- <tr>
- <td><h5>sink.flush-on-checkpoint</h5></td>
- <td>optional</td>
- <td style="word-wrap: break-word;">true</td>
- <td>Boolean</td>
- <td>Flush on checkpoint or not. When disabled, a sink will not wait for
all pending action requests
- to be acknowledged by Elasticsearch on checkpoints. Thus, a sink does
NOT provide any strong
- guarantees for at-least-once delivery of action requests.
- </td>
+ <td>Optional delivery guarantee when committing. Valid values are
<code>NONE</code> or <code>AT_LEAST_ONCE</code>.</td>
</tr>
<tr>
<td><h5>sink.bulk-flush.max-actions</h5></td>
@@ -183,11 +166,11 @@ Connector Options
<tr>
<td><h5>sink.bulk-flush.backoff.strategy</h5></td>
<td>optional</td>
- <td style="word-wrap: break-word;">DISABLED</td>
+ <td style="word-wrap: break-word;">NONE</td>
<td>String</td>
<td>Specify how to perform retries if any flush actions failed due to a
temporary request error. Valid strategies are:
<ul>
- <li><code>DISABLED</code>: no retry performed, i.e. fail after the
first request error.</li>
+ <li><code>NONE</code>: no retry performed, i.e. fail after the first
request error.</li>
<li><code>CONSTANT</code>: wait for backoff delay between retries.</li>
<li><code>EXPONENTIAL</code>: initially wait for backoff delay and
increase exponentially between retries.</li>
</ul>
@@ -196,23 +179,16 @@ Connector Options
<tr>
<td><h5>sink.bulk-flush.backoff.max-retries</h5></td>
<td>optional</td>
- <td style="word-wrap: break-word;">8</td>
+ <td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>Maximum number of backoff retries.</td>
</tr>
<tr>
<td><h5>sink.bulk-flush.backoff.delay</h5></td>
<td>optional</td>
- <td style="word-wrap: break-word;">50ms</td>
- <td>Duration</td>
- <td>Delay between each backoff attempt. For <code>CONSTANT</code>
backoff, this is simply the delay between each retry. For
<code>EXPONENTIAL</code> backoff, this is the initial base delay.</td>
- </tr>
- <tr>
- <td><h5>connection.max-retry-timeout</h5></td>
- <td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Duration</td>
- <td>Maximum timeout between retries.</td>
+ <td>Delay between each backoff attempt. For <code>CONSTANT</code>
backoff, this is simply the delay between each retry. For
<code>EXPONENTIAL</code> backoff, this is the initial base delay.</td>
</tr>
<tr>
<td><h5>connection.path-prefix</h5></td>
@@ -238,20 +214,20 @@ Features
### Key Handling
-Elasticsearch sink can work in either upsert mode or append mode, it depends
on whether primary key is defined.
-If primary key is defined, Elasticsearch sink works in upsert mode which can
consume queries containing UPDATE/DELETE messages.
-If primary key is not defined, Elasticsearch sink works in append mode which
can only consume queries containing INSERT only messages.
+The Elasticsearch sink can work in either upsert mode or append mode,
depending on whether a primary key is defined.
+If a primary key is defined, the Elasticsearch sink works in upsert mode which
can consume queries containing UPDATE/DELETE messages.
+If a primary key is not defined, the Elasticsearch sink works in append mode
which can only consume queries containing INSERT only messages.
-In Elasticsearch connector, the primary key is used to calculate the
Elasticsearch document id, which is a string of up to 512 bytes. It cannot have
whitespaces.
+In the Elasticsearch connector, the primary key is used to calculate the
Elasticsearch document id, which is a string of up to 512 bytes. It cannot have
whitespaces.
The Elasticsearch connector generates a document ID string for every row by
concatenating all primary key fields in the order defined in the DDL using a
key delimiter specified by `document-id.key-delimiter`.
-Certain types are not allowed as primary key field as they do not have a good
string representation, e.g. `BYTES`, `ROW`, `ARRAY`, `MAP`, etc.
+Certain types are not allowed as a primary key field as they do not have a
good string representation, e.g. `BYTES`, `ROW`, `ARRAY`, `MAP`, etc.
If no primary key is specified, Elasticsearch will generate a document id
automatically.
-See [CREATE TABLE DDL]({{< ref "docs/dev/table/sql/create" >}}#create-table)
for more details about PRIMARY KEY syntax.
+See [CREATE TABLE DDL]({{< ref "docs/dev/table/sql/create" >}}#create-table)
for more details about the PRIMARY KEY syntax.
### Dynamic Index
-Elasticsearch sink supports both static index and dynamic index.
+The Elasticsearch sink supports both static index and dynamic index.
If you want to have a static index, the `index` option value should be a plain
string, e.g. `'myusers'`, all the records will be consistently written into
"myusers" index.