[
https://issues.apache.org/jira/browse/FLINK-14938?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Shengnan YU updated FLINK-14938:
--------------------------------
Description:
When use Elasticsearch connector failure handler (from official example) to
re-add documents, Flink encountered ConcurrentModificationException.
{code:java}
input.addSink(new ElasticsearchSink<>(
config, transportAddresses,
new ElasticsearchSinkFunction<String>() {...},
new ActionRequestFailureHandler() {
@Override
void onFailure(ActionRequest action,
Throwable failure,
int restStatusCode,
RequestIndexer indexer) throw Throwable {
if (ExceptionUtils.findThrowable(failure,
EsRejectedExecutionException.class).isPresent()) {
// full queue; re-add document for indexing
indexer.add(action);
}
}
}));
{code}
I found that in method BufferingNoOpRequestIndexer$processBufferedRequests, it
will iterator a list of ActionRequest. However the failure handler will keep
re-adding request to that list after bulk, which causes
ConcurrentModificationException.
{code:java}
void processBufferedRequests(RequestIndexer actualIndexer) {
for (ActionRequest request : bufferedRequests) {
if (request instanceof IndexRequest) {
actualIndexer.add((IndexRequest) request);
} else if (request instanceof DeleteRequest) {
actualIndexer.add((DeleteRequest) request);
} else if (request instanceof UpdateRequest) {
actualIndexer.add((UpdateRequest) request);
}
}
bufferedRequests.clear();
}{code}
I think it should be a multi-thread bug and need to find a thread-safe List to
maintain the failure request?
was:
When use Elasticsearch connector failure handler (from official example) to
re-add documents, Flink encountered ConcurrentModificationException.
{code:java}
input.addSink(new ElasticsearchSink<>(
config, transportAddresses,
new ElasticsearchSinkFunction<String>() {...},
new ActionRequestFailureHandler() {
@Override
void onFailure(ActionRequest action,
Throwable failure,
int restStatusCode,
RequestIndexer indexer) throw Throwable {
if (ExceptionUtils.findThrowable(failure,
EsRejectedExecutionException.class).isPresent()) {
// full queue; re-add document for indexing
indexer.add(action);
}
}
}));
{code}
I found that in method BufferingNoOpRequestIndexer$processBufferedRequests, it
will iterator a list of ActionRequest. However the failure handler will keep
re-adding request to that list after bulk, which causes
ConcurrentModificationException.
I think it should be a multi-thread bug and need to find a thread-safe List to
maintain the failure request?
> Flink elasticsearch failure handler re-add indexrequest causes
> ConcurrentModificationException
> ----------------------------------------------------------------------------------------------
>
> Key: FLINK-14938
> URL: https://issues.apache.org/jira/browse/FLINK-14938
> Project: Flink
> Issue Type: Bug
> Components: Connectors / ElasticSearch
> Affects Versions: 1.8.1
> Reporter: Shengnan YU
> Priority: Major
>
>
> When use Elasticsearch connector failure handler (from official example) to
> re-add documents, Flink encountered ConcurrentModificationException.
> {code:java}
> input.addSink(new ElasticsearchSink<>(
> config, transportAddresses,
> new ElasticsearchSinkFunction<String>() {...},
> new ActionRequestFailureHandler() {
> @Override
> void onFailure(ActionRequest action,
> Throwable failure,
> int restStatusCode,
> RequestIndexer indexer) throw Throwable {
> if (ExceptionUtils.findThrowable(failure,
> EsRejectedExecutionException.class).isPresent()) {
> // full queue; re-add document for indexing
> indexer.add(action);
> }
> }
> }));
> {code}
> I found that in method BufferingNoOpRequestIndexer$processBufferedRequests,
> it will iterator a list of ActionRequest. However the failure handler will
> keep re-adding request to that list after bulk, which causes
> ConcurrentModificationException.
> {code:java}
> void processBufferedRequests(RequestIndexer actualIndexer) {
> for (ActionRequest request : bufferedRequests) {
> if (request instanceof IndexRequest) {
> actualIndexer.add((IndexRequest) request);
> } else if (request instanceof DeleteRequest) {
> actualIndexer.add((DeleteRequest) request);
> } else if (request instanceof UpdateRequest) {
> actualIndexer.add((UpdateRequest) request);
> }
> }
> bufferedRequests.clear();
> }{code}
> I think it should be a multi-thread bug and need to find a thread-safe List
> to maintain the failure request?
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)