Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3358#discussion_r102177012
  
    --- Diff: docs/dev/connectors/elasticsearch.md ---
    @@ -272,9 +308,115 @@ input.addSink(new ElasticsearchSink(config, new 
ElasticsearchSinkFunction[String
     The difference is that now we do not need to provide a list of addresses
     of Elasticsearch nodes.
     
    +### Handling Failing Elasticsearch Requests
    +
    +Elasticsearch action requests may fail due to a variety of reasons, 
including
    +temporarily saturated node queue capacity or malformed documents to be 
indexed.
    +The Flink Elasticsearch Sink allows the user to specify how request
    +failures are handled, by simply implementing an 
`ActionRequestFailureHandler` and
    +providing it to the constructor.
    +
    +Below is an example:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +DataStream<String> input = ...;
    +
    +input.addSink(new ElasticsearchSink<>(
    +    config, transportAddresses,
    +    new ElasticsearchSinkFunction<String>() {...},
    +    new ActionRequestFailureHandler() {
    +        @Override
    +        boolean onFailure(ActionRequest action, Throwable failure, 
RequestIndexer indexer) {
    +            // this example uses Apache Commons to search for nested 
exceptions
    +            
    +            if (ExceptionUtils.indexOfThrowable(failure, 
EsRejectedExecutionException.class) >= 0) {
    +                // full queue; re-add document for indexing
    +                indexer.add(action);
    +                return false;
    +            } else if (ExceptionUtils.indexOfThrowable(failure, 
ElasticsearchParseException.class) >= 0) {
    +                // malformed document; simply drop request without failing 
sink
    +                return false;
    +            } else {
    +                // for all other failures, fail the sink
    +                return true;
    +            }
    +        }
    +}));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val input: DataStream[String] = ...
    +
    +input.addSink(new ElasticsearchSink(
    +    config, transportAddresses,
    +    new ElasticsearchSinkFunction[String] {...},
    +    new ActionRequestFailureHandler {
    +        override def onFailure(ActionRequest action, Throwable failure, 
RequestIndexer indexer) {
    +            // this example uses Apache Commons to search for nested 
exceptions
    +
    +            if (ExceptionUtils.indexOfThrowable(failure, 
EsRejectedExecutionException.class) >= 0) {
    +                // full queue; re-add document for indexing
    +                indexer.add(action)
    +                return false
    +            } else if (ExceptionUtils.indexOfThrowable(failure, 
ElasticsearchParseException.class) {
    +                // malformed document; simply drop request without failing 
sink
    +                return false
    +            } else {
    +                // for all other failures, fail the sink
    +                return true
    +            }
    +        }
    +}))
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +The above example will let the sink re-add requests that failed due to
    +queue capacity saturation and drop requests with malformed documents, 
without
    +failing the sink. For all other failures, the sink will fail. If a 
`ActionRequestFailureHandler`
    +is not provided to the constructor, the sink will fail for any kind of 
error.
    +
    +Note that `onFailure` is called for failures that still occur only after 
the
    +`BulkProcessor` internally finishes all backoff retry attempts.
    +By default, the `BulkProcessor` retries to a maximum of 8 attempts with
    +an exponential backoff. For more information on the behaviour of the
    +internal `BulkProcessor` and how to configure it, please see the following 
section.
    +
    +<p style="border-radius: 5px; padding: 5px" class="bg-danger">
    +<b>IMPORTANT</b>: Re-adding requests back to the internal 
<b>BulkProcessor</b>
    +on failures will lead to longer checkpoints, as the sink will also
    +need to wait for the re-added requests to be flushed when checkpointing.
    +This also means that if re-added requests never succeed, the checkpoint 
will
    +never finish.
    +</p>
    --- End diff --
    
    I think the docs should mention the `NoOpActionRequestFailureHandler`.
    
    Also I wonder if we should offer a default 
`RetryActionRequestFailureHandler`. I suspect that many users will need that. 
What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to