GitHub user tzulitai opened a pull request:
https://github.com/apache/flink/pull/3246
[FLINK-5353] [elasticsearch] User-provided failure handler for
ElasticsearchSink
Only the last commit is relevant. This PR is based on #3112 so that the
functionality is added for all Elasticsearch versions.
It is also based on the work of @static-max in #2861, but with improvements
for a more general approach to solve both
[FLINK-5353](https://issues.apache.org/jira/browse/FLINK-5353) and
[FLINK-5122](https://issues.apache.org/jira/browse/FLINK-5122). The PR is more
of a preview of the functionality for our Elasticsearch users, as proper
testing for the expected behaviours is still pending / Javadoc updates.
With this PR, users can now provide a `ActionRequestFailureHandler` that
controls how to deal with a failed Elasticsearch request.
Example:
```
private static class ExampleActionRequestFailureHandler implements
ActionRequestFailureHandler {
@Override
boolean onFailure(ActionRequest action, Throwable failure,
RequestIndexer indexer) {
if (failure instanceOf EsRejectedExecutionException) {
indexer.add(action);
return false;
} else if (failure instanceOf ElasticsearchParseException) {
// simply drop request without failing sink
return false;
} else {
// for all other failures, fail the sink
return true;
}
}
}
```
The above example will let the sink re-add requests that failed due to
queue capacity saturation and drop requests with malformed documents, without
failing the sink. For all other failures, the sink will fail. The handler is
provided to the constructor of `ElasticsearchSink`.
Note that the `onFailure` method is called only after the internal
`BulkProcessor` finishes all backoff retry attempts for temporary
`EsRejectedExecutionException`s (saturated ES node queue capacity).
### Alternatives:
1. Currently, all failures reported in the `afterBulk` callback will be
used to invoke `onFailure` of the handler. We can perhaps just pass some
specific exceptions for the user to decide on how to handle them.
2. The original `ElasticsearchSinkFunction` and new
`ActionRequestFailureHandler` interface could perhaps be integrated into one.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tzulitai/flink FLINK-5353
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/3246.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #3246
----
commit bf84c0aa91924aca779189b628a656d9b54e36db
Author: Mike Dias <[email protected]>
Date: 2016-11-07T20:09:48Z
[FLINK-4988] Elasticsearch 5.x support
commit 4efb2d497759b3688fe80261df19bb1e1c3f1c21
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date: 2017-01-12T13:21:56Z
[FLINK-4988] [elasticsearch] Restructure Elasticsearch connectors
commit be35862383b69c0d65fefd2c48c772a81fceb8d5
Author: Max Kuklinski <[email protected]>
Date: 2016-11-23T16:54:11Z
[FLINK-5122] [elasticsearch] Retry temporary Elasticsearch request errors.
Covered exceptions are: Timeouts, No Master, UnavailableShardsException,
bulk queue on node full
commit fa67e8be5ca8e90d47ad12e947eac7b695e8fcca
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date: 2017-01-30T05:55:26Z
[FLINK-5353] [elasticsearch] User-provided failure handler for
ElasticsearchSink
This commit fixes both FLINK-5353 and FLINK-5122. It allows users to
implement a
failure handler to control how failed action requests are dealt with.
The commit also includes general improvements to FLINK-5122:
1. Use the built-in backoff functionality in the Elasticsearch
BulkProcessor (not
available for Elasticsearch 1.x)
2. Migrate the `checkErrorAndRetryBulk` functionality to the new failure
handler
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---