[
https://issues.apache.org/jira/browse/FLUME-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13850176#comment-13850176
]
Ashish Paliwal commented on FLUME-2254:
---------------------------------------
This looks like to an exactly once semantics expectation, in case of an error.
[~hshreedharan] AFAIK, we don't have exactly once semantics in Flume and
duplicate event may be present in case of failure scenarios. Do we want to fix
this?
> Improve log when there is a failure in a BulkRequest in ES Sink and avoid
> stuck failures in the channel
> -------------------------------------------------------------------------------------------------------
>
> Key: FLUME-2254
> URL: https://issues.apache.org/jira/browse/FLUME-2254
> Project: Flume
> Issue Type: Bug
> Components: Sinks+Sources
> Affects Versions: v1.4.0
> Reporter: Luis Pigueiras
> Assignee: Ashish Paliwal
> Priority: Minor
>
> I found two problems when there is a failure trying do to a {{BulkRequest}}
> to ElasticSearch using the ElasticSearch sink.
> One of them is that if there is one insertion failing inside the entire bulk,
> the events from the BulkRequest get stuck in the channel (even the ones that
> are not failing because if one fails the entire request is failing) and ours
> indexes grow because it's inserting the same events again and again (the ones
> that don't have any error) in ES due to Flume is retrying to insert those
> events.
> Another one is that it's difficult to see in the logs what events are the
> ones causing a fail in the entire request. Now you can only see the error
> output from ElasticSearch and it will very nice to know what events are not
> being inserted in ElasticSearch.
> My proposal to solve this is:
> - To save all the Events of the BulkRequest in some structure.
> - Remove the throw of {{EventDeliveryException}}.
> - If there is a failure, iterate over each event doing a request to ES,
> trying to know which events cannot be inserted in ES. If there is a failure
> with one of them, print the error and the event to the logs.
> Here is an example of how this can be implemented, it's not the most smart
> way to do it, but for me it works.
> {code}
> @@ -81,6 +80,8 @@ public class ElasticSearchSink extends AbstractSink
> implements Configurable {
> try {
> txn.begin();
> BulkRequestBuilder bulkRequest = client.prepareBulk();
> + LinkedList<Event> events = new LinkedList<Event>();
> +
> for (int i = 0; i < batchSize; i++) {
> Event event = channel.take();
>
> @@ -88,14 +89,14 @@ public class ElasticSearchSink extends AbstractSink
> implements Configurable {
> break;
> }
>
> IndexRequestBuilder indexRequest =
> indexRequestFactory.createIndexRequest(
> client, indexName, indexType, event);
>
> if (ttlMs > 0) {
> indexRequest.setTTL(ttlMs);
> }
>
> + events.add(event);
> bulkRequest.add(indexRequest);
> }
> @@ -116,7 +117,35 @@ public class ElasticSearchSink extends AbstractSink
> implements Configurable {
>
> BulkResponse bulkResponse = bulkRequest.execute().actionGet();
> if (bulkResponse.hasFailures()) {
> - throw new
> EventDeliveryException(bulkResponse.buildFailureMessage());
> + logger
> + .warn("There is a failure in the bulk request with this
> output\n"
> + + bulkResponse.buildFailureMessage());
> + logger
> + .warn("Trying to do the requests separately to know what
> notifications"
> + + " of the requests have errors");
> + for (Event event : events) {
> + try {
> + indexRequestFactory
> + .createIndexRequest(client, indexName, indexType, event)
> + .execute().actionGet();
> + } catch (Exception e) {
> + logger.error(e.getMessage());
> +
> + if (serializer instanceof ElasticSearchEventSerializer) {
> + XContentBuilder builder = (XContentBuilder)
> ((ElasticSearchEventSerializer) serializer)
> + .getContentBuilder(event);
> + logger
> + .error("There is an error with the following
> notification:\n"
> + + builder.string());
> + } else {
> + logger
> + .error("There is no possibility of printing the
> notification because"
> + + " the serializer is a different instance from"
> + + " ElasticSearchEventSerializer");
> + }
> +
> + }
> + }
> }
> }
> txn.commit();
> {code}
--
This message was sent by Atlassian JIRA
(v6.1.4#6159)