[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15875780#comment-15875780
 ] 

ASF GitHub Bot commented on FLINK-5487:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3358#discussion_r102180098
  
    --- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
    @@ -165,20 +286,36 @@ public void beforeBulk(long executionId, BulkRequest 
request) { }
                                @Override
                                public void afterBulk(long executionId, 
BulkRequest request, BulkResponse response) {
                                        if (response.hasFailures()) {
    -                                           for (BulkItemResponse itemResp 
: response.getItems()) {
    -                                                   Throwable failure = 
callBridge.extractFailureCauseFromBulkItemResponse(itemResp);
    +                                           BulkItemResponse itemResponse;
    +                                           Throwable failure;
    +
    +                                           for (int i = 0; i < 
response.getItems().length; i++) {
    +                                                   itemResponse = 
response.getItems()[i];
    +                                                   failure = 
callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
                                                        if (failure != null) {
    -                                                           
LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), 
failure);
    -                                                           
failureThrowable.compareAndSet(null, failure);
    +                                                           
LOG.error("Failed Elasticsearch item request: {}", 
itemResponse.getFailureMessage(), failure);
    +
    +                                                           if 
(failureHandler.onFailure(request.requests().get(i), failure, requestIndexer)) {
    --- End diff --
    
    I wonder if it would be better if the `onFailure` method would not return a 
boolean but throw a Throwable?
    This way users have more flexibility in implementing their failure handler.
    
    For example if a failure handler is doing three retries and fails 
afterwards, the original exception will be thrown. If the `onFailure()` method 
can throw their own exception, you can throw a custom exception that tells the 
user about the three retries.
    
    We can definitively discuss this because this change is annoying to do 
(docs & javadocs need to be updated).


> Proper at-least-once support for ElasticsearchSink
> --------------------------------------------------
>
>                 Key: FLINK-5487
>                 URL: https://issues.apache.org/jira/browse/FLINK-5487
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming Connectors
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to