Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/flink/pull/2861#discussion_r93049169
--- Diff:
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
---
@@ -227,6 +264,37 @@ public void afterBulk(long executionId, BulkRequest
request, Throwable failure)
requestIndexer = new BulkProcessorIndexer(bulkProcessor);
}
+ /**
+ * Adds all requests of the bulk to the BulkProcessor. Used when trying
again.
+ * @param bulkRequest
+ */
+ public void reAddBulkRequest(BulkRequest bulkRequest) {
+ //TODO Check what happens when bulk contains a DeleteAction and
IndexActions and the DeleteAction fails because the document already has been
deleted. This may not happen in typical Flink jobs.
+
+ for (IndicesRequest req : bulkRequest.subRequests()) {
+ if (req instanceof ActionRequest) {
+ // There is no waiting time between index
requests, so this may produce additional pressure on cluster
+ bulkProcessor.add((ActionRequest<?>) req);
--- End diff --
Do you know if the BulkProcessor is thread safe? I assume multiple threads
will add bulks concurrently (because of the calls from the callbacks)
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---