[ 
https://issues.apache.org/jira/browse/FLINK-30526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17652525#comment-17652525
 ] 

Andriy Redko commented on FLINK-30526:
--------------------------------------

The deprecations are related to 
https://issues.apache.org/jira/browse/FLINK-24323 and basically hint towards 
using 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API]
 instead, hope it helps.

> Handle failures in OpenSearch with ActionRequestFailureHandler being 
> deprecated
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-30526
>                 URL: https://issues.apache.org/jira/browse/FLINK-30526
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Opensearch
>            Reporter: Martijn Visser
>            Priority: Major
>
> {quote} Hi everyone,
> I have a streaming application that has Elasticsearch sink.
> I Upgraded flink version from 1.11 to 1.16 and also moved from ES 7 to 
> OpenSearch 2.0, and now I'm facing some deprected issues, hope you can help 
> me.
> In the previous version I created ElasticsearchSink and added a failure 
> handler, which protected the sink to not fail on some exceptions.
>  final ActionRequestFailureHandler failureHandler = (action, failure, 
> restStatusCode, indexer) -> {
>             if (ExceptionUtils.findThrowable(failure, 
> EsRejectedExecutionException.class).isPresent()) {
>                 indexer.add(action);
>             } else if (ExceptionUtils.findThrowable(failure, 
> ElasticsearchParseException.class).isPresent()) {
>                 log.warn("Got malformed document , action {}", action);
>                 // malformed document; simply drop elasticsearchSinkFunction 
> without failing sink
>             } else if (failure instanceof IOException && failure.getCause() 
> instanceof NullPointerException && failure.getMessage().contains("Unable to 
> parse response body")) {
>                 //issue with ES 7 and opensearch - that does not send type - 
> while response is waiting for it
>                 //at 
> org.elasticsearch.action.DocWriteResponse.<init>(DocWriteResponse.java:127) 
> -- this.type = Objects.requireNonNull(type);
>                 log.debug("known issue format the response for ES 7.5.1 and 
> DB OS (opensearch) :{}", failure.getMessage());
>             } else {
>                 // for all other failures, log and don't fail the sink
>                 log.error("Got error while trying to perform ES action {}", 
> action, failure);
>             }
>         };
>               
>  final ElasticsearchSink.Builder<T> builder = new 
> ElasticsearchSink.Builder<>(transportNodes, elasticsearchSinkFunction);
> In the new version the class ActionRequestFailureHandler is deprecated and 
> after investigation I can't find any way to handle failures.
> For all failures the sink fails.
> Is there anything I didn't see?
> Thanks is advance! 
> {quote}
> From the Apache Flink Slack channel 
> https://apache-flink.slack.com/archives/C03G7LJTS2G/p1672122873318899



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to