[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15880075#comment-15880075
 ] 

ASF GitHub Bot commented on FLINK-5487:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3358#discussion_r102656903
  
    --- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
    @@ -211,6 +283,23 @@ public void invoke(T value) throws Exception {
        }
     
        @Override
    +   public void initializeState(FunctionInitializationContext context) 
throws Exception {
    +           // no initialization needed
    +   }
    +
    +   @Override
    +   public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
    +           checkErrorAndRethrow();
    +
    +           if (flushOnCheckpoint) {
    +                   do {
    +                           bulkProcessor.flush();
    --- End diff --
    
    On a second look, I think my previous statement is incorrect.
    
    To elaborate, this is the way the `BulkProcessor`'s `flush` is implemented:
    ```
    if(this.bulkRequest.numberOfActions() > 0) {
        this.execute();
    }
    ```
    
    `execute()` doesn't return until `afterBulk` is called on the listener. 
Since we can re-add requests to the bulk processor within `afterBulk`, the 
`bulkRequest.numberOfActions() > 0` will be true again and enters the loop.
    
    Therefore, the `bulkProcessor.flush()` can actually just be called once, 
and will work with our failure-handler re-adding strategy so that the flush 
also waits for re-added requests before returning. We can just check once on 
`numPendingRequests` after the flush to make sure the flush works as expected.


> Proper at-least-once support for ElasticsearchSink
> --------------------------------------------------
>
>                 Key: FLINK-5487
>                 URL: https://issues.apache.org/jira/browse/FLINK-5487
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming Connectors
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to