[ 
https://issues.apache.org/jira/browse/FLINK-17623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17105161#comment-17105161
 ] 

Leonard Xu commented on FLINK-17623:
------------------------------------

Hi [~yunw] 
       I got your point, currently *ElasticsearchSinkFunction* is designed as a 
simple *Function* that only can process message and can not offer the ability 
to open a resource/cleanup resource, thest ability are abstracted to 
*ElasticsearchSink*. 
      You want *ElasticsearchSinkFunction* to expose a `close` method(i.e 
server as RichFunction) so that you can simply implements a 
*ElasticsearchRichSinkFunction*(we assume this name) to meet your requirement. 
I think the requirement make sense, but the new *ElasticsearchRichSinkFunction* 
will has some duplication with *ElasticsearchSink* , we need to make this clear 
before start.

   And could you describe your more so that we can consider more detail ?

> Elasticsearch sink should support user resource cleanup
> -------------------------------------------------------
>
>                 Key: FLINK-17623
>                 URL: https://issues.apache.org/jira/browse/FLINK-17623
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / ElasticSearch
>            Reporter: Yun Wang
>            Priority: Major
>              Labels: usability
>
> There should be a way for an 
> [ElasticsearchSinkFunction|[https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java]]
>  implementation to use resources with the same lifecycle as the Elasticsearch 
> sink, for example, an 
> [RestHighLevelClient|[https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/java-rest-high.html]].
> Currently there is no way to clean up such resources.
> This can be achieved by either of the below:
>  # Expose a `close()` method in the ElasticsearchSinkFunction interface, and 
> invoke the close method from [ElasticsearchSinkBase.close 
> |[https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java#L331]].
>  # Make the 
> [ElasticsearchSink|[https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java]]
>  class extendable.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to