Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/1962#discussion_r66388492
--- Diff:
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
---
@@ -153,9 +165,90 @@ public ElasticsearchSink(Map<String, String>
userConfig, List<InetSocketAddress>
*/
@Override
public void open(Configuration configuration) {
+ connect();
+
+ params = ParameterTool.fromMap(userConfig);
+
+ if (params.has(CONFIG_KEY_CONNECTION_RETRIES)) {
+ this.connectionRetries =
params.getInt(CONFIG_KEY_CONNECTION_RETRIES);
+ }
+
+ buildBulkProcessorIndexer(client);
+ }
+
+ @Override
+ public void invoke(T element) {
+ elasticsearchSinkFunction.process(element, getRuntimeContext(),
requestIndexer);
+
+ if (hasFailure.get()) {
--- End diff --
Another problem with this implementation is that we're capturing the
failure of a bulk operation, but only retrying for the currently processed
element.
Would it be simpler to check whether the Elasticsearch client is still
connected to any nodes before `elasticsearchSinkFunction.process(element,
getRuntimeContext(), requestIndexer);` via `client.connectedNodes().size()`? If
not, then we retry establishing connection before processing the element.
Another approach that might be better is to let the BulkProcessor set a
`lostConnection` flag if it gets thrown a ES connection error for a batch, and
we simply check the flag before in invoke() before doing anything else. But
still we will need have a way to handle all incorrectly records in that batch
due to the lost connection.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---