[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15880057#comment-15880057
 ] 

ASF GitHub Bot commented on FLINK-5487:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3358#discussion_r102654126
  
    --- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
    @@ -211,6 +283,23 @@ public void invoke(T value) throws Exception {
        }
     
        @Override
    +   public void initializeState(FunctionInitializationContext context) 
throws Exception {
    +           // no initialization needed
    +   }
    +
    +   @Override
    +   public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
    +           checkErrorAndRethrow();
    +
    +           if (flushOnCheckpoint) {
    +                   do {
    +                           bulkProcessor.flush();
    --- End diff --
    
    Ah, I see the problem here ...
    The bulk processor's internal `bulkRequest.numberOfActions() == 0` will 
become `true` as soon as it starts executing the flush, and not after 
`afterBulk` is invoked.
    
    So, since our `numPendingRequests` implementation relies on the `afterBulk` 
callback, we might have busy loops on `bulkProcessor.flush()` while we wait for 
`numPendingRequests` to become 0.
    
    This is quite a nice catch actually! So no worries on bringing it up now.


> Proper at-least-once support for ElasticsearchSink
> --------------------------------------------------
>
>                 Key: FLINK-5487
>                 URL: https://issues.apache.org/jira/browse/FLINK-5487
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming Connectors
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to