aljoscha commented on a change in pull request #9468: [FLINK-13689]
[Connectors/ElasticSearch] Fix thread leak when elasticsearch6 rest high level
cli…
URL: https://github.com/apache/flink/pull/9468#discussion_r335474108
##########
File path:
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
##########
@@ -300,6 +301,14 @@ public void open(Configuration parameters) throws
Exception {
bulkProcessor = buildBulkProcessor(new BulkProcessorListener());
requestIndexer =
callBridge.createBulkProcessorIndexer(bulkProcessor, flushOnCheckpoint,
numPendingRequests);
failureRequestIndexer = new BufferingNoOpRequestIndexer();
+
+ if(client instanceof RestHighLevelClient) {
+ RestHighLevelClient rhlClient = (RestHighLevelClient)
client;
+ if(!rhlClient.ping()) {
+ rhlClient.close();
+ throw new RuntimeException("There are no
reachable Elasticsearch nodes!");
+ }
+ }
Review comment:
I don't think this is the solution @tzulitai had in mind. He was referring
to the fact that `close()` is calling `close()` on the client. And `close()` on
the operator is called when there is a failure. So if we set the client to the
`client` field we guarantee that we successfully clean up.
Now the only question is whether to call `ping()` and how to call it. I
don't think adding a concrete dependency for ES6 6.3.1 is the solution here.
It's problematic when you use other connector versions.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services