[ 
https://issues.apache.org/jira/browse/BEAM-5725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16670108#comment-16670108
 ] 

Wout Scheepers commented on BEAM-5725:
--------------------------------------

I've been digging into it and came up with the following so far:

As Tim already pointed out, the problem here is that parseResponse(response) is 
indeed not repeatable, the content can only be consumed once as the Elastic 
Reponse object encapsulates a HttpEntity object.
I added a reproducible unit test[1], which tries to insert a valid document 
with the retryConfiguration set. 
It becomes clear that handleRetry() does not have to be called for the bug to 
appear.

My first thought on a solution was to use the reset() method on the InputStream 
of the Entity content, however, this is not supported for the InputStream used 
in HttpEntity.

A possible solution would be the following:
Encapulate the Response object in a wrapper object, making sure the content of 
the HttpEntity object can be parsed repeatable.
I think the best way to do this is by implementing a BufferedHttpEntity[2] in 
the wrapper.

This can be done either only in the case a retryConfiguration is set, but I 
guess it's probably better to create a wrapper for the response objects 
everywhere in the ElasticSearchIO class?

Is there a better or more elegant solution for this?

I've also found a way of getting control on how to read bytes from the buffer 
in elastic docs[3] but I'm not sure it can be of any help (it's for async 
calls, not sure they are used in ElasticSearchIO):
"... As for reading the response body, the HttpEntity#getContent method comes 
handy which returns an InputStream reading from the previously buffered 
response body. As an alternative, it is possible to provide a custom 
org.apache.http.nio.protocol.HttpAsyncResponseConsumer that controls how bytes 
are read and buffered. ..."


I'm happy to get some thoughts on a good solution.

Thanks,
Wout

[1] 
https://github.com/wscheep/beam/commit/8f2093066a2908f0472983cfc640bc7644b728d9
[2] 
https://hc.apache.org/httpcomponents-core-ga/httpcore/apidocs/org/apache/http/entity/BufferedHttpEntity.html
[3] 
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/java-rest-low-usage-responses.html

> ElasticsearchIO RetryConfiguration response parse failure
> ---------------------------------------------------------
>
>                 Key: BEAM-5725
>                 URL: https://issues.apache.org/jira/browse/BEAM-5725
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-elasticsearch
>            Reporter: Wout Scheepers
>            Assignee: Wout Scheepers
>            Priority: Major
>
> When using .withRetryConfiguration() for ElasticsearchIO, I get the following 
> stacktrace:
>  
>  
> {code:java}
> Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: No 
> content to map due to end-of-input
> at [Source: (org.apache.http.nio.entity.ContentInputStream); line: 1, column: 
> 0]
> at 
> com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
> at 
> com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4133)
> at 
> com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3988)
> at 
> com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3058)
> at 
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.parseResponse(ElasticsearchIO.java:173)
> at 
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:177)
> at 
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1204)
> at 
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.finishBundle(ElasticsearchIO.java:1175)
> {code}
>  
>  
> Probably the elastic response object's content stream is consumed twice, 
> resulting in a MismatchedInputException.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to