[
https://issues.apache.org/jira/browse/BEAM-6304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767013#comment-16767013
]
Alexander Hoem Rosbach commented on BEAM-6304:
----------------------------------------------
We're in a situation where we really need something like this. We're working
with data from many external sources with varying quality that we process
through a streaming job that writes to Elasticsearch. The indices that we write
to have mappings that changes over time as new fields are added or changed to
different types, e.g. a field mapping changes from string to double. The
streaming job is not aware of these mappings, and can in some cases (due to the
varying data quality) process entities that doesn't conform to the mapping.
This causes the job to stall (retry bundle indefinitely) on an IOException that
is thrown from the ElasticsearchIO writer. We would like to write these
unprocessable entities to a log/db/topic so that we can handle these cases
without blocking the job.
> can ElasticsearchIO add a ExceptionHandlerFn
> ---------------------------------------------
>
> Key: BEAM-6304
> URL: https://issues.apache.org/jira/browse/BEAM-6304
> Project: Beam
> Issue Type: New Feature
> Components: io-java-elasticsearch
> Affects Versions: Not applicable
> Reporter: big
> Assignee: Etienne Chauchot
> Priority: Major
> Labels: triaged
>
> I use ElasticsearchIO to write my data to elasticSearch. However, the data is
> from other platform and not easy to check its validity. If we get the invalid
> data, we can ignore it( even though use batch insert, we can ignore all of
> them). So, I wish has a registered exception catch function to process it.
> From now on, I read the source code about write function in ProcessElement,
> it just throw the exception and cause my job to stop.
> I can catch pipeline.run().waitUntilFinish() on direct runner and force it
> run again use while statement ungracefully. However, when it deploy to Flink,
> it will fail because Flink report exception that it cannot optimize the job.
> If there is a method let user to decide how to process exception is required.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)