[
https://issues.apache.org/jira/browse/FLUME-2769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14709769#comment-14709769
]
Adam Gent commented on FLUME-2769:
----------------------------------
This change worked for me:
https://github.com/agentgt/flume/commit/09089257df239eeac942ef64b2d24c68efb5bec7
> Elastic Search Sink throws exceptions on messages that look like JSON
> ---------------------------------------------------------------------
>
> Key: FLUME-2769
> URL: https://issues.apache.org/jira/browse/FLUME-2769
> Project: Flume
> Issue Type: Bug
> Components: Sinks+Sources
> Affects Versions: 1.6
> Reporter: Adam Gent
>
> We have a message body that looks like JSON but is not and the elastic
> content type guesser gets confused:
> {code}
> 24 Aug 2015 11:15:58,693 ERROR
> [SinkRunner-PollingRunner-DefaultSinkProcessor]
> (org.apache.flume.sink.elasticsearch.ElasticSearchSink.process:225) - Failed
> to commit transaction. Transaction rolled back.
> org.elasticsearch.common.jackson.core.JsonParseException: Unrecognized token
> 'Flush': was expecting ('true', 'false' or 'null')
> at [Source: [B@3ce85c92; line: 1, column: 7]
> at
> org.elasticsearch.common.jackson.core.JsonParser._constructError(JsonParser.java:1487)
> at
> org.elasticsearch.common.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:518)
> at
> org.elasticsearch.common.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3323)
> at
> org.elasticsearch.common.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2482)
> at
> org.elasticsearch.common.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:801)
> at
> org.elasticsearch.common.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:697)
> at
> org.elasticsearch.common.xcontent.json.JsonXContentParser.nextToken(JsonXContentParser.java:51)
> at
> org.apache.flume.sink.elasticsearch.ContentBuilderUtil.addComplexField(ContentBuilderUtil.java:60)
> at
> org.apache.flume.sink.elasticsearch.ContentBuilderUtil.appendField(ContentBuilderUtil.java:47)
> at
> org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer.appendBody(ElasticSearchLogStashEventSerializer.java:87)
> at
> org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer.getContentBuilder(ElasticSearchLogStashEventSerializer.java:79)
> at
> org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer.getContentBuilder(ElasticSearchLogStashEventSerializer.java:73)
> at
> org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.addEvent(ElasticSearchTransportClient.java:164)
> at
> org.apache.flume.sink.elasticsearch.ElasticSearchSink.process(ElasticSearchSink.java:189)
> at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> It appears that this was also a previous regression:
> {code:java}
> public static void addComplexField(XContentBuilder builder, String
> fieldName,
> XContentType contentType, byte[] data) throws IOException {
> XContentParser parser =
> XContentFactory.xContent(contentType).createParser(data);
> parser.nextToken();
> // Add the field name, but not the value.
> builder.field(fieldName);
> try {
> // This will add the whole parsed content as the value of the field.
> builder.copyCurrentStructure(parser);
> } catch (JsonParseException ex) {
> // If we get an exception here the most likely cause is nested JSON that
> // can't be figured out in the body. At this point just push it through
> // as is, we have already added the field so don't do it again
> builder.endObject();
> builder.field(fieldName, new String(data, charset));
> } finally {
> if (parser != null) {
> parser.close();
> }
> }
> }
> {code}
> Notice how the {{try}} statement is not around the {{nextToken}} which is now
> throwing the exception.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)