[ 
https://issues.apache.org/jira/browse/BEAM-6304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767214#comment-16767214
 ] 

Etienne Chauchot commented on BEAM-6304:
----------------------------------------

I understand your problem, nevertheless I have 2 interrogations:

why is the bundle infinitely tried : dataflow is supposed to re-test it twice, 
flink retries it infinitely ?
 I still believe you can do it out of the box with a pre-processing DoFn with 
something like (pseudo code, blindly written):
{code:java}
PCollection<String> toWrite = input.apply(ParDo.of (new DoFn {

@setup public void setup(){

// request the mapping from the ES index with an http request (with restClient 
for ex) and store it

}

@processElement public void processElement(ProcessContext context){

// test the validity of context.element against the mapping

if (!isCompatible(context.element)){

LOG.error("incompatible element: " + context.element);

} else {

context.output(context.element)

}

}

}))

toWrite.apply (ElasticsearchIO.write().with ...)

{code}
 

That way you log incompatible elements and filter them out of the collection 
sent to ES and with each bundle the mapping you test elements against is 
updated.

Am I missing something ? 

> can ElasticsearchIO add a ExceptionHandlerFn 
> ---------------------------------------------
>
>                 Key: BEAM-6304
>                 URL: https://issues.apache.org/jira/browse/BEAM-6304
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-java-elasticsearch
>    Affects Versions: Not applicable
>            Reporter: big
>            Assignee: Etienne Chauchot
>            Priority: Major
>              Labels: triaged
>
> I use ElasticsearchIO to write my data to elasticSearch. However, the data is 
> from other platform and not easy to check its validity. If we get the invalid 
> data, we can ignore it( even though use batch insert, we can ignore all of 
> them). So, I wish has a registered exception catch function to process it. 
> From now on, I read the source code about write function in ProcessElement, 
> it just throw the exception and cause my job to stop. 
> I can catch   pipeline.run().waitUntilFinish() on direct runner and force it 
> run again use while statement ungracefully. However, when it deploy to Flink, 
> it will fail because Flink report exception that it cannot optimize the job.
> If there is a method let user to decide how to process exception is required.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to