Luis Pigueiras created FLUME-2254:
-------------------------------------

             Summary: Improve log when there is a failure in a BulkRequest in 
ES Sink and avoid stuck failures in the channel
                 Key: FLUME-2254
                 URL: https://issues.apache.org/jira/browse/FLUME-2254
             Project: Flume
          Issue Type: Bug
          Components: Sinks+Sources
    Affects Versions: v1.4.0
            Reporter: Luis Pigueiras
            Priority: Minor


I found two problems when there is a failure trying do to a {{BulkRequest}} to 
ElasticSearch using the ElasticSearch sink. 

One of them is that if there is one insertion failing inside the entire bulk, 
the events from the BulkRequest get stuck in the channel (even the ones that 
are not failing because if one fails the entire request is failing) and ours 
indexes grow because it's inserting the same events again and again (the ones 
that don't have any error) in ES due to Flume is retrying to insert those 
events. 

Another one is that it's difficult to see in the logs what events are the ones 
causing a fail in the entire request. Now you can only see the error output 
from ElasticSearch and it will very nice to know what events are not being 
inserted in ElasticSearch.

My proposal to solve this is:
- To save all the Events of the BulkRequest in some structure.
- Remove the throw of {{EventDeliveryException}}.
- If there is a failure, iterate over each event doing a request to ES, trying 
to know which events cannot be inserted in ES. If there is a failure with one 
of them, print the error and the event to the logs.

Here is an example of how this can be implemented, it's not the most smart way 
to do it, but for me it works.

{code}
@@ -81,6 +80,8 @@ public class ElasticSearchSink extends AbstractSink 
implements Configurable {
     try {
       txn.begin();
       BulkRequestBuilder bulkRequest = client.prepareBulk();
+      LinkedList<Event> events = new LinkedList<Event>();
+
       for (int i = 0; i < batchSize; i++) {
         Event event = channel.take();
 
@@ -88,14 +89,14 @@ public class ElasticSearchSink extends AbstractSink 
implements Configurable {
           break;
         }
 
        IndexRequestBuilder indexRequest =
            indexRequestFactory.createIndexRequest(
                client, indexName, indexType, event);
 
         if (ttlMs > 0) {
           indexRequest.setTTL(ttlMs);
         }
 
+        events.add(event);
         bulkRequest.add(indexRequest);
       }


@@ -116,7 +117,35 @@ public class ElasticSearchSink extends AbstractSink 
implements Configurable {
 
         BulkResponse bulkResponse = bulkRequest.execute().actionGet();
         if (bulkResponse.hasFailures()) {
-          throw new EventDeliveryException(bulkResponse.buildFailureMessage());
+          logger
+              .warn("There is a failure in the bulk request with this output\n"
+                  + bulkResponse.buildFailureMessage());
+          logger
+              .warn("Trying to do the requests separately to know what 
notifications"
+                  + " of the requests have errors");
+          for (Event event : events) {
+            try {
+              indexRequestFactory
+                  .createIndexRequest(client, indexName, indexType, event)
+                  .execute().actionGet();
+            } catch (Exception e) {
+              logger.error(e.getMessage());
+
+              if (serializer instanceof ElasticSearchEventSerializer) {
+                XContentBuilder builder = (XContentBuilder) 
((ElasticSearchEventSerializer) serializer)
+                    .getContentBuilder(event);
+                logger
+                    .error("There is an error with the following 
notification:\n"
+                        + builder.string());
+              } else {
+                logger
+                    .error("There is no possibility of printing the 
notification because"
+                        + " the serializer is a different instance from"
+                        + " ElasticSearchEventSerializer");
+              }
+
+            }
+          }
         }
       }
       txn.commit();
{code}



--
This message was sent by Atlassian JIRA
(v6.1#6144)

Reply via email to