[ 
https://issues.apache.org/jira/browse/METRON-326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15536022#comment-15536022
 ] 

ASF GitHub Bot commented on METRON-326:
---------------------------------------

Github user justinleet commented on a diff in the pull request:

    https://github.com/apache/incubator-metron/pull/286#discussion_r81339694
  
    --- Diff: 
metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
 ---
    @@ -166,14 +166,37 @@ public void write(String sensorType, 
WriterConfiguration configurations, Iterabl
     
         }
     
    -    BulkResponse resp = bulkRequest.execute().actionGet();
    -
    -    if (resp.hasFailures()) {
    +    BulkResponse bulkResponse = bulkRequest.execute().actionGet();
     
    -      throw new Exception(resp.buildFailureMessage());
    +    return buildWriteReponse(tuples, bulkResponse);
    +  }
     
    +  protected BulkWriterResponse buildWriteReponse(Iterable<Tuple> tuples, 
BulkResponse bulkResponse) throws Exception {
    +    // Elasticsearch responses are in the same order as the request, 
giving us an implicit mapping with Tuples
    +    BulkWriterResponse writerResponse = new BulkWriterResponse();
    +    if (bulkResponse.hasFailures()) {
    +      Iterator<BulkItemResponse> respIter = bulkResponse.iterator();
    +      Iterator<Tuple> tupleIter = tuples.iterator();
    +      while (respIter.hasNext() && tupleIter.hasNext()) {
    +        BulkItemResponse item = respIter.next();
    +        Tuple tuple = tupleIter.next();
    +
    +        if (item.isFailed()) {
    +          writerResponse.addError(item.getFailure().getCause(), tuple);
    +        } else {
    +          writerResponse.addSuccess(tuple);
    +        }
    +
    +        // Should never happen, so fail the entire batch if it does.
    +        if (respIter.hasNext() != tupleIter.hasNext()) {
    +          throw new Exception(bulkResponse.buildFailureMessage());
    --- End diff --
    
    Good catch, I'll make that change.  IllegalStateException seems pretty 
reasonable.


> Error Handling in ElasticsearchWriter
> -------------------------------------
>
>                 Key: METRON-326
>                 URL: https://issues.apache.org/jira/browse/METRON-326
>             Project: Metron
>          Issue Type: Bug
>            Reporter: Ajay Yadav
>            Assignee: Justin Leet
>
> In Elasticsearch writer we raise a exception if BulkResponse object has 
> failures and that results in failing the whole batch even if some objects 
> failed in it. This has spiral effect specially when there is continuous 
> stream of bad messages and errorStream is tied to indexingBolt. 
> If possible we should iterate through items in BulkResponse object and send 
> only failed messages to errorStream.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to