Shengnan YU created FLINK-14938:
-----------------------------------
Summary: Flink elasticsearch failure handler re-add indexrequest
causes ConcurrentModificationException
Key: FLINK-14938
URL: https://issues.apache.org/jira/browse/FLINK-14938
Project: Flink
Issue Type: Bug
Components: Connectors / ElasticSearch
Affects Versions: 1.8.1
Reporter: Shengnan YU
When use Elasticsearch connector failure handler (from official example) to
re-add documents, Flink encountered ConcurrentModificationException.
{code:java}
input.addSink(new ElasticsearchSink<>(
config, transportAddresses,
new ElasticsearchSinkFunction<String>() {...},
new ActionRequestFailureHandler() {
@Override
void onFailure(ActionRequest action,
Throwable failure,
int restStatusCode,
RequestIndexer indexer) throw Throwable {
if (ExceptionUtils.findThrowable(failure,
EsRejectedExecutionException.class).isPresent()) {
// full queue; re-add document for indexing
indexer.add(action);
}
}
}));
{code}
I found that in method BufferingNoOpRequestIndexer$processBufferedRequests, it
will iterator a list of ActionRequest. However the failure handler will keep
re-adding request to that list after bulk, which causes
ConcurrentModificationException.
I think it should be a multi-thread bug and need to find a thread-safe List to
maintain the failure request?
--
This message was sent by Atlassian Jira
(v8.3.4#803005)