[
https://issues.apache.org/jira/browse/FLINK-17623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17108771#comment-17108771
]
Yun Wang commented on FLINK-17623:
----------------------------------
The more I think about it, my preferred solution would be: make the
{{ElasticsearchSink}} class extendable.
This not only addresses the reported issue, it also gives the users more
options to write more complicated business logic (for example, having custom
state).
Let me know if this makes sense.
I'll gladly submit a PR, if you don't have the cycles to work on this at the
moment.
> Elasticsearch sink should support user resource cleanup
> -------------------------------------------------------
>
> Key: FLINK-17623
> URL: https://issues.apache.org/jira/browse/FLINK-17623
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / ElasticSearch
> Reporter: Yun Wang
> Priority: Major
> Labels: usability
>
> There should be a way for an
> [ElasticsearchSinkFunction|[https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java]]
> implementation to use resources with the same lifecycle as the Elasticsearch
> sink, for example, an
> [RestHighLevelClient|[https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/java-rest-high.html]].
> Currently there is no way to clean up such resources.
> This can be achieved by either of the below:
> # Expose a `close()` method in the ElasticsearchSinkFunction interface, and
> invoke the close method from [ElasticsearchSinkBase.close
> |[https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java#L331]].
> # Make the
> [ElasticsearchSink|[https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java]]
> class extendable.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)