[
https://issues.apache.org/jira/browse/BEAM-5725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16670108#comment-16670108
]
Wout Scheepers commented on BEAM-5725:
--------------------------------------
I've been digging into it and came up with the following so far:
As Tim already pointed out, the problem here is that parseResponse(response) is
indeed not repeatable, the content can only be consumed once as the Elastic
Reponse object encapsulates a HttpEntity object.
I added a reproducible unit test[1], which tries to insert a valid document
with the retryConfiguration set.
It becomes clear that handleRetry() does not have to be called for the bug to
appear.
My first thought on a solution was to use the reset() method on the InputStream
of the Entity content, however, this is not supported for the InputStream used
in HttpEntity.
A possible solution would be the following:
Encapulate the Response object in a wrapper object, making sure the content of
the HttpEntity object can be parsed repeatable.
I think the best way to do this is by implementing a BufferedHttpEntity[2] in
the wrapper.
This can be done either only in the case a retryConfiguration is set, but I
guess it's probably better to create a wrapper for the response objects
everywhere in the ElasticSearchIO class?
Is there a better or more elegant solution for this?
I've also found a way of getting control on how to read bytes from the buffer
in elastic docs[3] but I'm not sure it can be of any help (it's for async
calls, not sure they are used in ElasticSearchIO):
"... As for reading the response body, the HttpEntity#getContent method comes
handy which returns an InputStream reading from the previously buffered
response body. As an alternative, it is possible to provide a custom
org.apache.http.nio.protocol.HttpAsyncResponseConsumer that controls how bytes
are read and buffered. ..."
I'm happy to get some thoughts on a good solution.
Thanks,
Wout
[1]
https://github.com/wscheep/beam/commit/8f2093066a2908f0472983cfc640bc7644b728d9
[2]
https://hc.apache.org/httpcomponents-core-ga/httpcore/apidocs/org/apache/http/entity/BufferedHttpEntity.html
[3]
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/java-rest-low-usage-responses.html
> ElasticsearchIO RetryConfiguration response parse failure
> ---------------------------------------------------------
>
> Key: BEAM-5725
> URL: https://issues.apache.org/jira/browse/BEAM-5725
> Project: Beam
> Issue Type: Bug
> Components: io-java-elasticsearch
> Reporter: Wout Scheepers
> Assignee: Wout Scheepers
> Priority: Major
>
> When using .withRetryConfiguration() for ElasticsearchIO, I get the following
> stacktrace:
>
>
> {code:java}
> Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: No
> content to map due to end-of-input
> at [Source: (org.apache.http.nio.entity.ContentInputStream); line: 1, column:
> 0]
> at
> com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
> at
> com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4133)
> at
> com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3988)
> at
> com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3058)
> at
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.parseResponse(ElasticsearchIO.java:173)
> at
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:177)
> at
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1204)
> at
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.finishBundle(ElasticsearchIO.java:1175)
> {code}
>
>
> Probably the elastic response object's content stream is consumed twice,
> resulting in a MismatchedInputException.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)