[ 
https://issues.apache.org/jira/browse/FLINK-5122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15848072#comment-15848072
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-5122 at 2/1/17 7:09 AM:
--------------------------------------------------------------------

I would like to handle this issue together with FLINK-5353 with a different 
approach: let the user provide a {{FailedActionRequestHandler}} that implements 
how to deal with an action request that failed, ex. drop it or re-add it to the 
{{BulkProcessor}}.

The reason for this is that there is actually quite a variety of different 
reasons an action request can fail, and for different cases, can be treated to 
be "temporary" differently. For example, in FLINK-5353, malformed documents can 
somewhat be "temporary" if the erroneous field is reprocessed. Instead of 
handling these case by case, I propose to let user implement logic for them.

The handler will look something like this:

{code}
public interface FailedActionRequestHandler {
    boolean onFailure(ActionRequest originalRequest, Throwable failure, 
RequestIndexer indexer);
}
{code}

The ElasticsearchSink will still try to retry a bulk request (with backoff) for 
obvious temporary errors like {{EsRejectedExecutionException}}, and will only 
call {{onFailure}} after the retries. There the user can decide whether they 
want to re-add it to be requested through the {{RequestIndexer}} or just drop 
it. The method should return {{true}} / {{false}} depending on whether they'd 
like to fail the sink because of that failure.

What do you think? Sorry for being picky about how to resolve this. I think 
it'll be best to find a good long-term solution, as from the current state of 
the ES issues I have a feeling that things will start to get unmaintainable 
once new exception handling cases pop out, so it'll be helpful to know what 
actual ES Flink users think of the idea.


was (Author: tzulitai):
I would like to handle this issue together with FLINK-5353 with a different 
approach: let the user provide a {{FailedActionRequestHandler}} that implements 
how to deal with an action request that failed, ex. drop it or re-add it to the 
{{BulkProcessor}}.

The reason for this is that there is actually quite a variety of different 
reasons an action request can fail, and for different cases, can be treated to 
be "temporary" differently. For example, in FLINK-5353, malformed documents can 
somewhat be "temporary" if the erroneous field is reprocessed. Instead of 
handling these case by case, I propose to let user implement logic for them.

The handler will look something like this:

{code}
public interface FailedActionRequestHandler {
    boolean onFailure(ActionRequest originalRequest, Throwable failure, 
RequestIndexer indexer);
}
{code}

The ElasticsearchSink will still try to retry a bulk request (with backoff) for 
obvious temporary errors like {{EsRejectedExecutionException}}, and will only 
call {{onFailure}} after the retries. There the user can decide whether they 
want to re-add it to be requested through the {{RequestIndexer}} or just drop 
it. The method should return {{true}} / {{false}} depending on whether they'd 
like to fail the sink because of that failure.

What do you think? Sorry for being picky about how to resolve this. I think 
it'll be best to find a good long-term solution, as from the current state of 
the ES issues I have a feeling that things will start to get unmaintainable, so 
it'll be helpful to know what actual ES Flink users think of the idea.

> Elasticsearch Sink loses documents when cluster has high load
> -------------------------------------------------------------
>
>                 Key: FLINK-5122
>                 URL: https://issues.apache.org/jira/browse/FLINK-5122
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming Connectors
>    Affects Versions: 1.2.0
>            Reporter: static-max
>            Assignee: static-max
>
> My cluster had high load and documents got not indexed. This violates the "at 
> least once" semantics in the ES connector.
> I gave pressure on my cluster to test Flink, causing new indices to be 
> created and balanced. On those errors the bulk should be tried again instead 
> of being discarded.
> Primary shard not active because ES decided to rebalance the index:
> 2016-11-15 15:35:16,123 ERROR 
> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink  - 
> Failed to index document in Elasticsearch: 
> UnavailableShardsException[[index-name][3] primary shard is not active 
> Timeout: [1m], request: [BulkShardRequest to [index-name] containing [20] 
> requests]]
> Bulk queue on node full (I set queue to a low value to reproduce error):
> 22:37:57,702 ERROR 
> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink  - 
> Failed to index document in Elasticsearch: 
> RemoteTransportException[[node1][192.168.1.240:9300][indices:data/write/bulk[s][p]]];
>  nested: EsRejectedExecutionException[rejected execution of 
> org.elasticsearch.transport.TransportService$4@727e677c on 
> EsThreadPoolExecutor[bulk, queue capacity = 1, 
> org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@51322d37[Running,
>  pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 
> 2939]]];
> I can try to propose a PR for this.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to