Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3358#discussion_r102177012 --- Diff: docs/dev/connectors/elasticsearch.md --- @@ -272,9 +308,115 @@ input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String The difference is that now we do not need to provide a list of addresses of Elasticsearch nodes. +### Handling Failing Elasticsearch Requests + +Elasticsearch action requests may fail due to a variety of reasons, including +temporarily saturated node queue capacity or malformed documents to be indexed. +The Flink Elasticsearch Sink allows the user to specify how request +failures are handled, by simply implementing an `ActionRequestFailureHandler` and +providing it to the constructor. + +Below is an example: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +DataStream<String> input = ...; + +input.addSink(new ElasticsearchSink<>( + config, transportAddresses, + new ElasticsearchSinkFunction<String>() {...}, + new ActionRequestFailureHandler() { + @Override + boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) { + // this example uses Apache Commons to search for nested exceptions + + if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) { + // full queue; re-add document for indexing + indexer.add(action); + return false; + } else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) >= 0) { + // malformed document; simply drop request without failing sink + return false; + } else { + // for all other failures, fail the sink + return true; + } + } +})); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val input: DataStream[String] = ... + +input.addSink(new ElasticsearchSink( + config, transportAddresses, + new ElasticsearchSinkFunction[String] {...}, + new ActionRequestFailureHandler { + override def onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) { + // this example uses Apache Commons to search for nested exceptions + + if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) { + // full queue; re-add document for indexing + indexer.add(action) + return false + } else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) { + // malformed document; simply drop request without failing sink + return false + } else { + // for all other failures, fail the sink + return true + } + } +})) +{% endhighlight %} +</div> +</div> + +The above example will let the sink re-add requests that failed due to +queue capacity saturation and drop requests with malformed documents, without +failing the sink. For all other failures, the sink will fail. If a `ActionRequestFailureHandler` +is not provided to the constructor, the sink will fail for any kind of error. + +Note that `onFailure` is called for failures that still occur only after the +`BulkProcessor` internally finishes all backoff retry attempts. +By default, the `BulkProcessor` retries to a maximum of 8 attempts with +an exponential backoff. For more information on the behaviour of the +internal `BulkProcessor` and how to configure it, please see the following section. + +<p style="border-radius: 5px; padding: 5px" class="bg-danger"> +<b>IMPORTANT</b>: Re-adding requests back to the internal <b>BulkProcessor</b> +on failures will lead to longer checkpoints, as the sink will also +need to wait for the re-added requests to be flushed when checkpointing. +This also means that if re-added requests never succeed, the checkpoint will +never finish. +</p> --- End diff -- I think the docs should mention the `NoOpActionRequestFailureHandler`. Also I wonder if we should offer a default `RetryActionRequestFailureHandler`. I suspect that many users will need that. What do you think?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---