[ 
https://issues.apache.org/jira/browse/FLINK-14938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17059695#comment-17059695
 ] 

Shengnan YU commented on FLINK-14938:
-------------------------------------

The easiest way to solve this issue is to use ConcurrentLinkedQueue instead of 
ArrayList in BufferingNoOpRequestIndexer. However use concurrent queue 'may' 
affect performance if users don't have any failure handles or have nothing to 
do with concurrent concerns. Therefore I'd like to create a new class 
ConcurrentBufferingNoOpRequestIndexer which use concurrent queue and users can 
decided which RequestIndexer to use when they build ElasticsearchSink. 

 

What do you think of this solution? [~rmetzger] Thank you very much.

> Flink elasticsearch failure handler re-add indexrequest causes 
> ConcurrentModificationException
> ----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-14938
>                 URL: https://issues.apache.org/jira/browse/FLINK-14938
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / ElasticSearch
>    Affects Versions: 1.8.1
>            Reporter: Shengnan YU
>            Assignee: Shengnan YU
>            Priority: Major
>
>  
> When use Elasticsearch connector failure handler (from official example) to 
> re-add documents, Flink encountered ConcurrentModificationException.
> {code:java}
> input.addSink(new ElasticsearchSink<>(
>     config, transportAddresses,
>     new ElasticsearchSinkFunction<String>() {...},
>     new ActionRequestFailureHandler() {
>         @Override
>         void onFailure(ActionRequest action,
>                 Throwable failure,
>                 int restStatusCode,
>                 RequestIndexer indexer) throw Throwable {
>             if (ExceptionUtils.findThrowable(failure, 
> EsRejectedExecutionException.class).isPresent()) {
>                 // full queue; re-add document for indexing
>                 indexer.add(action);
>             }
>         }
> }));
> {code}
> I found that in method BufferingNoOpRequestIndexer$processBufferedRequests, 
> it will iterator a list of ActionRequest. However the failure handler will 
> keep re-adding request to that list after bulk, which causes 
> ConcurrentModificationException.
> {code:java}
> void processBufferedRequests(RequestIndexer actualIndexer) {
>    for (ActionRequest request : bufferedRequests) {
>       if (request instanceof IndexRequest) {
>          actualIndexer.add((IndexRequest) request);
>       } else if (request instanceof DeleteRequest) {
>          actualIndexer.add((DeleteRequest) request);
>       } else if (request instanceof UpdateRequest) {
>          actualIndexer.add((UpdateRequest) request);
>       }
>    }
>    bufferedRequests.clear();
> }{code}
> I think it should be a multi-thread bug and is it ok to use concurrent queue 
> to maintain the failure request?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to