[
https://issues.apache.org/jira/browse/BEAM-6304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767214#comment-16767214
]
Etienne Chauchot commented on BEAM-6304:
----------------------------------------
I understand your problem, nevertheless I have 2 interrogations:
why is the bundle infinitely tried : dataflow is supposed to re-test it twice,
flink retries it infinitely ?
I still believe you can do it out of the box with a pre-processing DoFn with
something like (pseudo code, blindly written):
{code:java}
PCollection<String> toWrite = input.apply(ParDo.of (new DoFn {
@setup public void setup(){
// request the mapping from the ES index with an http request (with restClient
for ex) and store it
}
@processElement public void processElement(ProcessContext context){
// test the validity of context.element against the mapping
if (!isCompatible(context.element)){
LOG.error("incompatible element: " + context.element);
} else {
context.output(context.element)
}
}
}))
toWrite.apply (ElasticsearchIO.write().with ...)
{code}
That way you log incompatible elements and filter them out of the collection
sent to ES and with each bundle the mapping you test elements against is
updated.
Am I missing something ?
> can ElasticsearchIO add a ExceptionHandlerFn
> ---------------------------------------------
>
> Key: BEAM-6304
> URL: https://issues.apache.org/jira/browse/BEAM-6304
> Project: Beam
> Issue Type: New Feature
> Components: io-java-elasticsearch
> Affects Versions: Not applicable
> Reporter: big
> Assignee: Etienne Chauchot
> Priority: Major
> Labels: triaged
>
> I use ElasticsearchIO to write my data to elasticSearch. However, the data is
> from other platform and not easy to check its validity. If we get the invalid
> data, we can ignore it( even though use batch insert, we can ignore all of
> them). So, I wish has a registered exception catch function to process it.
> From now on, I read the source code about write function in ProcessElement,
> it just throw the exception and cause my job to stop.
> I can catch pipeline.run().waitUntilFinish() on direct runner and force it
> run again use while statement ungracefully. However, when it deploy to Flink,
> it will fail because Flink report exception that it cannot optimize the job.
> If there is a method let user to decide how to process exception is required.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)