This is an automated email from the ASF dual-hosted git repository.
aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 44ea896 [FLINK-14938] Use ConcurrentLinkedQueue in
BufferingNoOpRequestIndexer
44ea896 is described below
commit 44ea896d6c5bbfcabb79d9649dd15c834741c4b9
Author: yushengnan <[email protected]>
AuthorDate: Fri Jun 19 21:45:52 2020 +0800
[FLINK-14938] Use ConcurrentLinkedQueue in BufferingNoOpRequestIndexer
This solves the problem of concurrent modification when re-adding ES
index requests from a failure handler.
---
.../connectors/elasticsearch/BufferingNoOpRequestIndexer.java | 7 +++----
1 file changed, 3 insertions(+), 4 deletions(-)
diff --git
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BufferingNoOpRequestIndexer.java
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BufferingNoOpRequestIndexer.java
index e639b82..07341da 100644
---
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BufferingNoOpRequestIndexer.java
+++
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BufferingNoOpRequestIndexer.java
@@ -27,9 +27,8 @@ import org.elasticsearch.action.update.UpdateRequest;
import javax.annotation.concurrent.NotThreadSafe;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
/**
* Implementation of a {@link RequestIndexer} that buffers {@link
ActionRequest ActionRequests}
@@ -39,10 +38,10 @@ import java.util.List;
@NotThreadSafe
class BufferingNoOpRequestIndexer implements RequestIndexer {
- private List<ActionRequest> bufferedRequests;
+ private ConcurrentLinkedQueue<ActionRequest> bufferedRequests;
BufferingNoOpRequestIndexer() {
- this.bufferedRequests = new ArrayList<>(10);
+ this.bufferedRequests = new
ConcurrentLinkedQueue<ActionRequest>();
}
@Override