[ 
https://issues.apache.org/jira/browse/FLINK-5122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15842574#comment-15842574
 ] 

ASF GitHub Bot commented on FLINK-5122:
---------------------------------------

Github user static-max commented on the issue:

    https://github.com/apache/flink/pull/2861
  
    In my tests BulkItemResponse.getFailure().getCause() returns a 
RemoteTransportException like this:
    
`RemoteTransportException[[Harrier][127.0.0.1:9302][indices:data/write/bulk[s]]];
 nested: 
RemoteTransportException[[Harrier][127.0.0.1:9302][indices:data/write/bulk[s][p]]];
 nested: EsRejectedExecutionException[rejected execution of 
org.elasticsearch.transport.TransportService$4@3a0f3a6e on 
EsThreadPoolExecutor[bulk, queue capacity = 2, 
org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@5ac266bd[Running, 
pool size = 8, active threads = 8, queued tasks = 2, completed tasks = 206]]];`
    
    So the nested Exception needs to be checked. That's possible, I will 
implement that change.
    
    The last to Exceptions are common when a new Index gets created (if you 
have new index by day for example), or when a node leaves the cluster and no 
master can be elected (no quorum),


> Elasticsearch Sink loses documents when cluster has high load
> -------------------------------------------------------------
>
>                 Key: FLINK-5122
>                 URL: https://issues.apache.org/jira/browse/FLINK-5122
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming Connectors
>    Affects Versions: 1.2.0
>            Reporter: static-max
>            Assignee: static-max
>
> My cluster had high load and documents got not indexed. This violates the "at 
> least once" semantics in the ES connector.
> I gave pressure on my cluster to test Flink, causing new indices to be 
> created and balanced. On those errors the bulk should be tried again instead 
> of being discarded.
> Primary shard not active because ES decided to rebalance the index:
> 2016-11-15 15:35:16,123 ERROR 
> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink  - 
> Failed to index document in Elasticsearch: 
> UnavailableShardsException[[index-name][3] primary shard is not active 
> Timeout: [1m], request: [BulkShardRequest to [index-name] containing [20] 
> requests]]
> Bulk queue on node full (I set queue to a low value to reproduce error):
> 22:37:57,702 ERROR 
> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink  - 
> Failed to index document in Elasticsearch: 
> RemoteTransportException[[node1][192.168.1.240:9300][indices:data/write/bulk[s][p]]];
>  nested: EsRejectedExecutionException[rejected execution of 
> org.elasticsearch.transport.TransportService$4@727e677c on 
> EsThreadPoolExecutor[bulk, queue capacity = 1, 
> org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@51322d37[Running,
>  pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 
> 2939]]];
> I can try to propose a PR for this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to