[
https://issues.apache.org/jira/browse/FLINK-17623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17105837#comment-17105837
]
Yun Wang commented on FLINK-17623:
----------------------------------
[~Leonard Xu] Sure! What I'm trying to achieve is: make synchronous read
requests to ES in the {{process}} method in my {{ElasticsearchSinkFunction}}
implementation.
I'm planning on using the {{RestHighLevelClient}} to do this, but I do not want
to create a new instance of {{RestHighLevelClient}} and close it per message,
within the {{process}} method. I would like to use one {{RestHighLevelClient}}
and only close it when the {{ElasticsearchSink}} closes. There are a couple of
ways to achieve this:
# Create a {{RestHighLevelClient}} myself, and pass it in my
{{ElasticsearchSinkFunction}} implementation. However, currently there's no way
for me to close the created {{RestHighLevelClient}} when the
{{ElasticsearchSink}} closes. I proposed two possible ways to close the
{{RestHighLevelClient}} that I created in the description:
## Expose a {{close}} method in the {{ElasticsearchSinkFunction}} interface,
and invoke the {{ElasticsearchSinkFunction.close}} method in
{{ElasticsearchSinkBase.close}}. – I agree with you this may not be the best
option because of the duplication. I only suggested it as I saw an {{open}}
method was added recently in {{ElasticsearchSinkFunction}}.
## Make the {{ElasticsearchSink}} class extendable, so that I can override the
{{open}} and {{close}} methods in {{ElasticsearchSinkBase}} class. The
constructor of the {{ElasticsearchSink}} class is private currently in favor of
the builder pattern, but it also makes the class not extendable. This does
avoid the duplication issue you mentioned.
# As [~karmagyz] mentioned, use the same {{RestHighLevelClient}} that the
{{ElasticsearchSink}} (6.x+) is using internally, if I can get the client via a
public method. In this case, the {{RestHighLevelClient}} is cleaned up properly
by the {{ElasticsearchSink}} already.
Either of the above would work. Solution 1 covers the cleanup of any user
resource, beyond the {{RestHighLevelClient}}.
Let me know what you think, any alternative solution is also welcome!
> Elasticsearch sink should support user resource cleanup
> -------------------------------------------------------
>
> Key: FLINK-17623
> URL: https://issues.apache.org/jira/browse/FLINK-17623
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / ElasticSearch
> Reporter: Yun Wang
> Priority: Major
> Labels: usability
>
> There should be a way for an
> [ElasticsearchSinkFunction|[https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java]]
> implementation to use resources with the same lifecycle as the Elasticsearch
> sink, for example, an
> [RestHighLevelClient|[https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/java-rest-high.html]].
> Currently there is no way to clean up such resources.
> This can be achieved by either of the below:
> # Expose a `close()` method in the ElasticsearchSinkFunction interface, and
> invoke the close method from [ElasticsearchSinkBase.close
> |[https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java#L331]].
> # Make the
> [ElasticsearchSink|[https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java]]
> class extendable.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)