Martijn Visser created FLINK-30526:
--------------------------------------

             Summary: Handle failures in OpenSearch with 
ActionRequestFailureHandler being deprecated
                 Key: FLINK-30526
                 URL: https://issues.apache.org/jira/browse/FLINK-30526
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Opensearch
            Reporter: Martijn Visser


{quote} Hi everyone,
I have a streaming application that has Elasticsearch sink.
I Upgraded flink version from 1.11 to 1.16 and also moved from ES 7 to 
OpenSearch 2.0, and now I'm facing some deprected issues, hope you can help me.
In the previous version I created ElasticsearchSink and added a failure 
handler, which protected the sink to not fail on some exceptions.
 final ActionRequestFailureHandler failureHandler = (action, failure, 
restStatusCode, indexer) -> {
            if (ExceptionUtils.findThrowable(failure, 
EsRejectedExecutionException.class).isPresent()) {
                indexer.add(action);
            } else if (ExceptionUtils.findThrowable(failure, 
ElasticsearchParseException.class).isPresent()) {
                log.warn("Got malformed document , action {}", action);
                // malformed document; simply drop elasticsearchSinkFunction 
without failing sink
            } else if (failure instanceof IOException && failure.getCause() 
instanceof NullPointerException && failure.getMessage().contains("Unable to 
parse response body")) {
                //issue with ES 7 and opensearch - that does not send type - 
while response is waiting for it
                //at 
org.elasticsearch.action.DocWriteResponse.<init>(DocWriteResponse.java:127) -- 
this.type = Objects.requireNonNull(type);
                log.debug("known issue format the response for ES 7.5.1 and DB 
OS (opensearch) :{}", failure.getMessage());
            } else {
                // for all other failures, log and don't fail the sink
                log.error("Got error while trying to perform ES action {}", 
action, failure);
            }
        };
                

 final ElasticsearchSink.Builder<T> builder = new 
ElasticsearchSink.Builder<>(transportNodes, elasticsearchSinkFunction);
In the new version the class ActionRequestFailureHandler is deprecated and 
after investigation I can't find any way to handle failures.
For all failures the sink fails.
Is there anything I didn't see?
Thanks is advance! 
{quote}

>From the Apache Flink Slack channel 
>https://apache-flink.slack.com/archives/C03G7LJTS2G/p1672122873318899



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to