This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 20bb42c1c4b Fix concurrent problem (NPE) of AsyncRequestManager
(#14004) (#14034)
20bb42c1c4b is described below
commit 20bb42c1c4bd42c2fadc0e0bcedc1304d2824eaf
Author: Li Yu Heng <[email protected]>
AuthorDate: Sat Nov 9 16:14:17 2024 +0800
Fix concurrent problem (NPE) of AsyncRequestManager (#14004) (#14034)
(cherry picked from commit 23b914d8da9bdb4f9ca6aa439e4edc5f8b7dcbc0)
---
.../iotdb/commons/client/request/AsyncRequestContext.java | 6 +++---
.../iotdb/commons/client/request/AsyncRequestManager.java | 10 ++++++----
2 files changed, 9 insertions(+), 7 deletions(-)
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestContext.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestContext.java
index 8b54072a4c4..be889b42aa6 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestContext.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestContext.java
@@ -51,7 +51,7 @@ public class AsyncRequestContext<Request, Response,
RequestType, NodeLocation> {
* <p>All kinds of AsyncHandler will remove its targetNode from the
nodeLocationMap only if its
* corresponding RPC request success
*/
- private final Map<Integer, NodeLocation> nodeLocationMap;
+ private final ConcurrentHashMap<Integer, NodeLocation> nodeLocationMap;
/**
* Map key: The indices(targetNode's ID) of asynchronous RPC requests.
@@ -84,7 +84,7 @@ public class AsyncRequestContext<Request, Response,
RequestType, NodeLocation> {
/** Constructor for null requests. */
public AsyncRequestContext(RequestType requestType, Map<Integer,
NodeLocation> nodeLocationMap) {
this.requestType = requestType;
- this.nodeLocationMap = nodeLocationMap;
+ this.nodeLocationMap = new ConcurrentHashMap<>(nodeLocationMap);
this.requestMap = new ConcurrentHashMap<>();
this.responseMap = new ConcurrentHashMap<>();
}
@@ -93,7 +93,7 @@ public class AsyncRequestContext<Request, Response,
RequestType, NodeLocation> {
public AsyncRequestContext(
RequestType requestType, Request request, Map<Integer, NodeLocation>
nodeLocationMap) {
this.requestType = requestType;
- this.nodeLocationMap = nodeLocationMap;
+ this.nodeLocationMap = new ConcurrentHashMap<>(nodeLocationMap);
this.requestMap = new ConcurrentHashMap<>();
this.nodeLocationMap.keySet().forEach(nodeId ->
this.requestMap.put(nodeId, request));
this.responseMap = new ConcurrentHashMap<>();
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
index 9ca809a4d8c..3cca39bb635 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
@@ -114,10 +114,12 @@ public abstract class AsyncRequestManager<RequestType,
NodeLocation, Client> {
requestContext.resetCountDownLatch();
// Send requests to all targetNodes
- for (int requestId : requestContext.getRequestIndices()) {
- NodeLocation targetNode = requestContext.getNodeLocation(requestId);
- sendAsyncRequest(requestContext, requestId, targetNode, retry);
- }
+ final int finalRetry = retry;
+ requestContext
+ .getNodeLocationMap()
+ .forEach(
+ (requestId, nodeLocation) ->
+ sendAsyncRequest(requestContext, requestId, nodeLocation,
finalRetry));
// Wait for this batch of asynchronous RPC requests finish
try {