[
https://issues.apache.org/jira/browse/FLINK-17623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17105055#comment-17105055
]
Yun Wang commented on FLINK-17623:
----------------------------------
[~karmagyz] Ah I was using {{RestHighLevelClient}} as an example, to represent
any resource that has the same lifecycle as the sink, not just the incoming
message process method.
In my use case, I do need to access an instance of the {{RestHighLevelClient}}
within my {{ElasticsearchSinkFunction}} implementation, to make synchronous
read requests to ES before deciding which {{ActionRequest}} to add to the
{{RequestIndexer}}. And the scope of the client is {{private}} in the current
{{ElasticsearchSink}} class. Therefore I need to create a
{{RestHighLevelClient}} instance myself and close it when the sink closes.
I noticed an {{open()}} method was added to {{ElasticsearchSinkFunction}} in
[https://github.com/apache/flink/pull/11466|https://github.com/apache/flink/pull/11466/files],
hence my first suggestion of adding a {{close()}} method as well to support
the issue in hand.
> Elasticsearch sink should support user resource cleanup
> -------------------------------------------------------
>
> Key: FLINK-17623
> URL: https://issues.apache.org/jira/browse/FLINK-17623
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / ElasticSearch
> Reporter: Yun Wang
> Priority: Major
> Labels: usability
>
> There should be a way for an
> [ElasticsearchSinkFunction|[https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java]]
> implementation to use resources with the same lifecycle as the Elasticsearch
> sink, for example, an
> [RestHighLevelClient|[https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/java-rest-high.html]].
> Currently there is no way to clean up such resources.
> This can be achieved by either of the below:
> # Expose a `close()` method in the ElasticsearchSinkFunction interface, and
> invoke the close method from [ElasticsearchSinkBase.close
> |[https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java#L331]].
> # Make the
> [ElasticsearchSink|[https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java]]
> class extendable.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)