[
https://issues.apache.org/jira/browse/FLINK-17623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17108771#comment-17108771
]
Yun Wang edited comment on FLINK-17623 at 5/16/20, 1:46 AM:
------------------------------------------------------------
-The more I think about it, my preferred solution would be:- -make the
{{ElasticsearchSink}} class extendable-.
-This not only addresses the reported issue, it also gives the users more
options to write more complicated business logic (for example, having custom
state).-
The "make the {{ElasticsearchSink}} class extendable" solution doesn't handle
the {{transient}} variable initialization gracefully. The transient variables
won't be passed into the {{ElasticsearchSinkFunction}} implementation.
Unfortunately this will be a deal-breaker for my use case, given
{{RestHighLevelClient}} is not {{Serializable}}. I'd vote for the other 2
solutions.
Let me know if this makes sense.
I'll gladly submit a PR, if you don't have the cycles to work on this at the
moment.
was (Author: yunw):
The more I think about it, my preferred solution would be: make the
{{ElasticsearchSink}} class extendable.
This not only addresses the reported issue, it also gives the users more
options to write more complicated business logic (for example, having custom
state).
Let me know if this makes sense.
I'll gladly submit a PR, if you don't have the cycles to work on this at the
moment.
> Elasticsearch sink should support user resource cleanup
> -------------------------------------------------------
>
> Key: FLINK-17623
> URL: https://issues.apache.org/jira/browse/FLINK-17623
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / ElasticSearch
> Reporter: Yun Wang
> Priority: Major
> Labels: usability
>
> There should be a way for an
> [ElasticsearchSinkFunction|[https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java]]
> implementation to use resources with the same lifecycle as the Elasticsearch
> sink, for example, an
> [RestHighLevelClient|[https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/java-rest-high.html]].
> Currently there is no way to clean up such resources.
> This can be achieved by either of the below:
> # Expose a `close()` method in the ElasticsearchSinkFunction interface, and
> invoke the close method from [ElasticsearchSinkBase.close
> |[https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java#L331]].
> # Make the
> [ElasticsearchSink|[https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java]]
> class extendable.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)