[ 
https://issues.apache.org/jira/browse/FLUME-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13850176#comment-13850176
 ] 

Ashish Paliwal commented on FLUME-2254:
---------------------------------------

This looks like to an exactly once semantics expectation, in case of an error. 

[~hshreedharan] AFAIK, we don't have exactly once semantics in Flume and 
duplicate event may be present in case of failure scenarios. Do we want to fix 
this?

> Improve log when there is a failure in a BulkRequest in ES Sink and avoid 
> stuck failures in the channel
> -------------------------------------------------------------------------------------------------------
>
>                 Key: FLUME-2254
>                 URL: https://issues.apache.org/jira/browse/FLUME-2254
>             Project: Flume
>          Issue Type: Bug
>          Components: Sinks+Sources
>    Affects Versions: v1.4.0
>            Reporter: Luis Pigueiras
>            Assignee: Ashish Paliwal
>            Priority: Minor
>
> I found two problems when there is a failure trying do to a {{BulkRequest}} 
> to ElasticSearch using the ElasticSearch sink. 
> One of them is that if there is one insertion failing inside the entire bulk, 
> the events from the BulkRequest get stuck in the channel (even the ones that 
> are not failing because if one fails the entire request is failing) and ours 
> indexes grow because it's inserting the same events again and again (the ones 
> that don't have any error) in ES due to Flume is retrying to insert those 
> events. 
> Another one is that it's difficult to see in the logs what events are the 
> ones causing a fail in the entire request. Now you can only see the error 
> output from ElasticSearch and it will very nice to know what events are not 
> being inserted in ElasticSearch.
> My proposal to solve this is:
> - To save all the Events of the BulkRequest in some structure.
> - Remove the throw of {{EventDeliveryException}}.
> - If there is a failure, iterate over each event doing a request to ES, 
> trying to know which events cannot be inserted in ES. If there is a failure 
> with one of them, print the error and the event to the logs.
> Here is an example of how this can be implemented, it's not the most smart 
> way to do it, but for me it works.
> {code}
> @@ -81,6 +80,8 @@ public class ElasticSearchSink extends AbstractSink 
> implements Configurable {
>      try {
>        txn.begin();
>        BulkRequestBuilder bulkRequest = client.prepareBulk();
> +      LinkedList<Event> events = new LinkedList<Event>();
> +
>        for (int i = 0; i < batchSize; i++) {
>          Event event = channel.take();
>  
> @@ -88,14 +89,14 @@ public class ElasticSearchSink extends AbstractSink 
> implements Configurable {
>            break;
>          }
>  
>         IndexRequestBuilder indexRequest =
>             indexRequestFactory.createIndexRequest(
>                 client, indexName, indexType, event);
>  
>          if (ttlMs > 0) {
>            indexRequest.setTTL(ttlMs);
>          }
>  
> +        events.add(event);
>          bulkRequest.add(indexRequest);
>        }
> @@ -116,7 +117,35 @@ public class ElasticSearchSink extends AbstractSink 
> implements Configurable {
>  
>          BulkResponse bulkResponse = bulkRequest.execute().actionGet();
>          if (bulkResponse.hasFailures()) {
> -          throw new 
> EventDeliveryException(bulkResponse.buildFailureMessage());
> +          logger
> +              .warn("There is a failure in the bulk request with this 
> output\n"
> +                  + bulkResponse.buildFailureMessage());
> +          logger
> +              .warn("Trying to do the requests separately to know what 
> notifications"
> +                  + " of the requests have errors");
> +          for (Event event : events) {
> +            try {
> +              indexRequestFactory
> +                  .createIndexRequest(client, indexName, indexType, event)
> +                  .execute().actionGet();
> +            } catch (Exception e) {
> +              logger.error(e.getMessage());
> +
> +              if (serializer instanceof ElasticSearchEventSerializer) {
> +                XContentBuilder builder = (XContentBuilder) 
> ((ElasticSearchEventSerializer) serializer)
> +                    .getContentBuilder(event);
> +                logger
> +                    .error("There is an error with the following 
> notification:\n"
> +                        + builder.string());
> +              } else {
> +                logger
> +                    .error("There is no possibility of printing the 
> notification because"
> +                        + " the serializer is a different instance from"
> +                        + " ElasticSearchEventSerializer");
> +              }
> +
> +            }
> +          }
>          }
>        }
>        txn.commit();
> {code}



--
This message was sent by Atlassian JIRA
(v6.1.4#6159)

Reply via email to