[ 
https://issues.apache.org/jira/browse/FLUME-2769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14709769#comment-14709769
 ] 

Adam Gent commented on FLUME-2769:
----------------------------------

This change worked for me: 
https://github.com/agentgt/flume/commit/09089257df239eeac942ef64b2d24c68efb5bec7

> Elastic Search Sink throws exceptions on messages that look like JSON
> ---------------------------------------------------------------------
>
>                 Key: FLUME-2769
>                 URL: https://issues.apache.org/jira/browse/FLUME-2769
>             Project: Flume
>          Issue Type: Bug
>          Components: Sinks+Sources
>    Affects Versions: 1.6
>            Reporter: Adam Gent
>
> We have a message body that looks like JSON but is not and the elastic 
> content type guesser gets confused:
> {code}
> 24 Aug 2015 11:15:58,693 ERROR 
> [SinkRunner-PollingRunner-DefaultSinkProcessor] 
> (org.apache.flume.sink.elasticsearch.ElasticSearchSink.process:225)  - Failed 
> to commit transaction. Transaction rolled back.
> org.elasticsearch.common.jackson.core.JsonParseException: Unrecognized token 
> 'Flush': was expecting ('true', 'false' or 'null')
>  at [Source: [B@3ce85c92; line: 1, column: 7]
>         at 
> org.elasticsearch.common.jackson.core.JsonParser._constructError(JsonParser.java:1487)
>         at 
> org.elasticsearch.common.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:518)
>         at 
> org.elasticsearch.common.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3323)
>         at 
> org.elasticsearch.common.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2482)
>         at 
> org.elasticsearch.common.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:801)
>         at 
> org.elasticsearch.common.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:697)
>         at 
> org.elasticsearch.common.xcontent.json.JsonXContentParser.nextToken(JsonXContentParser.java:51)
>         at 
> org.apache.flume.sink.elasticsearch.ContentBuilderUtil.addComplexField(ContentBuilderUtil.java:60)
>         at 
> org.apache.flume.sink.elasticsearch.ContentBuilderUtil.appendField(ContentBuilderUtil.java:47)
>         at 
> org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer.appendBody(ElasticSearchLogStashEventSerializer.java:87)
>         at 
> org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer.getContentBuilder(ElasticSearchLogStashEventSerializer.java:79)
>         at 
> org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer.getContentBuilder(ElasticSearchLogStashEventSerializer.java:73)
>         at 
> org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.addEvent(ElasticSearchTransportClient.java:164)
>         at 
> org.apache.flume.sink.elasticsearch.ElasticSearchSink.process(ElasticSearchSink.java:189)
>         at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>         at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>         at java.lang.Thread.run(Thread.java:745)
> {code}
> It appears that this was also a previous regression:
> {code:java}
>   public static void addComplexField(XContentBuilder builder, String 
> fieldName,
>       XContentType contentType, byte[] data) throws IOException {
>     XContentParser parser =
>       XContentFactory.xContent(contentType).createParser(data);
>     parser.nextToken();
>     // Add the field name, but not the value.
>     builder.field(fieldName);
>     try {
>       // This will add the whole parsed content as the value of the field.
>       builder.copyCurrentStructure(parser);
>     } catch (JsonParseException ex) {
>       // If we get an exception here the most likely cause is nested JSON that
>       // can't be figured out in the body. At this point just push it through
>       // as is, we have already added the field so don't do it again
>       builder.endObject();
>       builder.field(fieldName, new String(data, charset));
>     } finally {
>       if (parser != null) {
>         parser.close();
>       }
>     }
>   }
> {code}
> Notice how the {{try}} statement is not around the {{nextToken}} which is now 
> throwing the exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to