[ 
https://issues.apache.org/jira/browse/FLINK-5353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15847946#comment-15847946
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-5353 at 2/1/17 4:02 AM:
--------------------------------------------------------------------

Hi [~f.pompermaier], first some clarification will be helpful:

1. Was the error somehow thrown in {{afterBulk}}, causing the sink to fail 
immediately? Previously we only keep the reported error, and only rethrow it on 
{{close()}}, so what your original description suggests seems a bit odd to me.

2. Which Elasticsearch version are you using? We're using the Elasticsearch 
{{BulkProcessor}} internally, and the expected behaviour is that one or more 
failures in the bulk should not affect the other good documents to be properly 
requested. So it *might* be that an older version of Elasticsearch's 
{{BulkProcessor}} is not following this expected behaviour.

3. Could you confirm if the exception was {{ElasticsearchParseException}}? I'm 
trying to find the exception type for malformed documents.

4. Have you tried setting the {{ignore_malformed}} config for your indices / 
fields? 
https://www.elastic.co/guide/en/elasticsearch/reference/current/ignore-malformed.html.


Now, about the possible approach:
With all the different types of exceptions that may occur at Elasticsearch, 
instead of handling them case by case in the connector code, I think it'll be 
reasonable to have an user-provided handler / callback for these failed 
documents, and let the user decide on how to handle them: either drop them, or 
re-add them to the {{RequestIndexer}}.

What do you think about this? Would be great to hear from actual ES users so 
that we can find an optimal, future change-proof solution here :-)


was (Author: tzulitai):
Hi [~f.pompermaier], first some clarification will be helpful:

1. Was the error somehow thrown in `afterBulk`, causing the sink to fail 
immediately? We only keep the reported error, and only rethrow it on `close()`, 
so what your original description suggests seems a bit odd to me.

2. Which Elasticsearch version are you using? We're using the Elasticsearch 
`BulkProcessor` internally, and the expected behaviour is that one or more 
failures in the bulk should not affect the other good documents to be properly 
requested. So it *might* be that an older version of Elasticsearch's 
`BulkProcessor` is not following this expected behaviour.

3. Could you confirm if the exception was `ElasticsearchParseException`? I'm 
trying to find the exception type for malformed documents.

4. Have you tried setting the `ignore_malformed` config for your indices / 
fields? 
https://www.elastic.co/guide/en/elasticsearch/reference/current/ignore-malformed.html.


Now, about the possible approach:
With all the different types of exceptions that may occur at Elasticsearch, 
instead of handling them case by case in the connector code, I think it'll be 
reasonable to have an user-provided handler / callback for these failed 
documents, and let the user decide on how to handle them: either drop them, or 
re-add them to the `RequestIndexer`.

What do you think about this? Would be great to hear from actual ES users so 
that we can find an optimal, future change-proof solution here :-)

> Elasticsearch Sink loses well-formed documents when there are malformed 
> documents
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-5353
>                 URL: https://issues.apache.org/jira/browse/FLINK-5353
>             Project: Flink
>          Issue Type: Bug
>          Components: Batch Connectors and Input/Output Formats
>    Affects Versions: 1.1.3
>            Reporter: Flavio Pompermaier
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to