[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879028#comment-15879028
 ] 

ASF GitHub Bot commented on FLINK-5487:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3358#discussion_r102546521
  
    --- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
 ---
    @@ -0,0 +1,46 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.elasticsearch.util;
    +
    +import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
    +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
    +import org.apache.flink.util.ExceptionUtils;
    +import org.elasticsearch.action.ActionRequest;
    +import 
org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
    +
    +/**
    + * An {@link ActionRequestFailureHandler} that re-adds requests that 
failed due to temporary
    + * {@link EsRejectedExecutionException}s (which means that Elasticsearch 
node queues are currently full),
    + * and fails for all other failures.
    + */
    +public class RetryRejectedExecutionFailureHandler implements 
ActionRequestFailureHandler {
    +
    +   private static final long serialVersionUID = -7423562912824511906L;
    +
    +   @Override
    +   public void onFailure(ActionRequest action, Throwable failure, int 
restStatusCode, RequestIndexer indexer) throws Throwable {
    +           if (ExceptionUtils.containsThrowable(failure, 
EsRejectedExecutionException.class)) {
    +                   indexer.add(action);
    --- End diff --
    
    Do you think this is worth a LOG.debug statement?
    Or will it happen too often / is too uninformative?
    
    I wonder if we could use the metrics system for exposing stuff like error 
rate, retry rate etc. (Maybe we should file a JIRA for the ElasticSearch 
connectors to "metricify" them)


> Proper at-least-once support for ElasticsearchSink
> --------------------------------------------------
>
>                 Key: FLINK-5487
>                 URL: https://issues.apache.org/jira/browse/FLINK-5487
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming Connectors
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to