[
https://issues.apache.org/jira/browse/FLINK-5122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15848072#comment-15848072
]
Tzu-Li (Gordon) Tai edited comment on FLINK-5122 at 2/1/17 7:09 AM:
--------------------------------------------------------------------
I would like to handle this issue together with FLINK-5353 with a different
approach: let the user provide a {{FailedActionRequestHandler}} that implements
how to deal with an action request that failed, ex. drop it or re-add it to the
{{BulkProcessor}}.
The reason for this is that there is actually quite a variety of different
reasons an action request can fail, and for different cases, can be treated to
be "temporary" differently. For example, in FLINK-5353, malformed documents can
somewhat be "temporary" if the erroneous field is reprocessed. Instead of
handling these case by case, I propose to let user implement logic for them.
The handler will look something like this:
{code}
public interface FailedActionRequestHandler {
boolean onFailure(ActionRequest originalRequest, Throwable failure,
RequestIndexer indexer);
}
{code}
The ElasticsearchSink will still try to retry a bulk request (with backoff) for
obvious temporary errors like {{EsRejectedExecutionException}}, and will only
call {{onFailure}} after the retries. There the user can decide whether they
want to re-add it to be requested through the {{RequestIndexer}} or just drop
it. The method should return {{true}} / {{false}} depending on whether they'd
like to fail the sink because of that failure.
What do you think? Sorry for being picky about how to resolve this. I think
it'll be best to find a good long-term solution, as from the current state of
the ES issues I have a feeling that things will start to get unmaintainable
once new exception handling cases pop out, so it'll be helpful to know what
actual ES Flink users think of the idea.
was (Author: tzulitai):
I would like to handle this issue together with FLINK-5353 with a different
approach: let the user provide a {{FailedActionRequestHandler}} that implements
how to deal with an action request that failed, ex. drop it or re-add it to the
{{BulkProcessor}}.
The reason for this is that there is actually quite a variety of different
reasons an action request can fail, and for different cases, can be treated to
be "temporary" differently. For example, in FLINK-5353, malformed documents can
somewhat be "temporary" if the erroneous field is reprocessed. Instead of
handling these case by case, I propose to let user implement logic for them.
The handler will look something like this:
{code}
public interface FailedActionRequestHandler {
boolean onFailure(ActionRequest originalRequest, Throwable failure,
RequestIndexer indexer);
}
{code}
The ElasticsearchSink will still try to retry a bulk request (with backoff) for
obvious temporary errors like {{EsRejectedExecutionException}}, and will only
call {{onFailure}} after the retries. There the user can decide whether they
want to re-add it to be requested through the {{RequestIndexer}} or just drop
it. The method should return {{true}} / {{false}} depending on whether they'd
like to fail the sink because of that failure.
What do you think? Sorry for being picky about how to resolve this. I think
it'll be best to find a good long-term solution, as from the current state of
the ES issues I have a feeling that things will start to get unmaintainable, so
it'll be helpful to know what actual ES Flink users think of the idea.
> Elasticsearch Sink loses documents when cluster has high load
> -------------------------------------------------------------
>
> Key: FLINK-5122
> URL: https://issues.apache.org/jira/browse/FLINK-5122
> Project: Flink
> Issue Type: Bug
> Components: Streaming Connectors
> Affects Versions: 1.2.0
> Reporter: static-max
> Assignee: static-max
>
> My cluster had high load and documents got not indexed. This violates the "at
> least once" semantics in the ES connector.
> I gave pressure on my cluster to test Flink, causing new indices to be
> created and balanced. On those errors the bulk should be tried again instead
> of being discarded.
> Primary shard not active because ES decided to rebalance the index:
> 2016-11-15 15:35:16,123 ERROR
> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink -
> Failed to index document in Elasticsearch:
> UnavailableShardsException[[index-name][3] primary shard is not active
> Timeout: [1m], request: [BulkShardRequest to [index-name] containing [20]
> requests]]
> Bulk queue on node full (I set queue to a low value to reproduce error):
> 22:37:57,702 ERROR
> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink -
> Failed to index document in Elasticsearch:
> RemoteTransportException[[node1][192.168.1.240:9300][indices:data/write/bulk[s][p]]];
> nested: EsRejectedExecutionException[rejected execution of
> org.elasticsearch.transport.TransportService$4@727e677c on
> EsThreadPoolExecutor[bulk, queue capacity = 1,
> org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@51322d37[Running,
> pool size = 2, active threads = 2, queued tasks = 1, completed tasks =
> 2939]]];
> I can try to propose a PR for this.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)