Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2861#discussion_r93049169
  
    --- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
    @@ -227,6 +264,37 @@ public void afterBulk(long executionId, BulkRequest 
request, Throwable failure)
                requestIndexer = new BulkProcessorIndexer(bulkProcessor);
        }
     
    +   /**
    +    * Adds all requests of the bulk to the BulkProcessor. Used when trying 
again.
    +    * @param bulkRequest
    +    */
    +   public void reAddBulkRequest(BulkRequest bulkRequest) {
    +           //TODO Check what happens when bulk contains a DeleteAction and 
IndexActions and the DeleteAction fails because the document already has been 
deleted. This may not happen in typical Flink jobs.
    +
    +           for (IndicesRequest req : bulkRequest.subRequests()) {
    +                   if (req instanceof ActionRequest) {
    +                           // There is no waiting time between index 
requests, so this may produce additional pressure on cluster
    +                           bulkProcessor.add((ActionRequest<?>) req);
    --- End diff --
    
    Do you know if the BulkProcessor is thread safe? I assume multiple threads 
will add bulks concurrently (because of the calls from the callbacks)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to