This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rc/2.0.5
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1a13919d5960ee4020907c4129782976231bb2f3
Author: shuwenwei <[email protected]>
AuthorDate: Tue Jul 29 20:48:38 2025 +0800

    Fix concurrent modification of non-thread-safe data structures caused by 
parallel dispatching
    
    (cherry picked from commit ee9d7d1b04e080ec08f356ec1949f717e882604c)
---
 .../apache/iotdb/db/queryengine/common/MPPQueryContext.java    | 10 +++++-----
 .../plan/scheduler/FragmentInstanceDispatcherImpl.java         |  9 +++++++--
 2 files changed, 12 insertions(+), 7 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
index b4bd87b2e7e..d0b7a072b54 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
@@ -34,10 +34,9 @@ import org.apache.tsfile.read.filter.basic.Filter;
 
 import java.time.ZoneId;
 import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * This class is used to record the context of a query including QueryId, 
query statement, session
@@ -66,7 +65,8 @@ public class MPPQueryContext {
   // When some DataNode cannot be connected, its endPoint will be put
   // in this list. And the following retry will avoid planning fragment
   // onto this node.
-  private final List<TEndPoint> endPointBlackList;
+  // When dispatch FI fails, this structure may be modified concurrently
+  private final Set<TEndPoint> endPointBlackList;
 
   private final TypeProvider typeProvider = new TypeProvider();
 
@@ -93,7 +93,7 @@ public class MPPQueryContext {
 
   public MPPQueryContext(QueryId queryId) {
     this.queryId = queryId;
-    this.endPointBlackList = new LinkedList<>();
+    this.endPointBlackList = ConcurrentHashMap.newKeySet();
     this.memoryReservationManager =
         new NotThreadSafeMemoryReservationManager(queryId, 
this.getClass().getName());
   }
@@ -194,7 +194,7 @@ public class MPPQueryContext {
     this.endPointBlackList.add(endPoint);
   }
 
-  public List<TEndPoint> getEndPointBlackList() {
+  public Set<TEndPoint> getEndPointBlackList() {
     return endPointBlackList;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index a4f21d2f20f..adeee8d41eb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -151,12 +151,17 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
             
instances.get(next.getPlanFragment().getIndexInFragmentInstanceList());
         futures.add(asyncDispatchOneInstance(next, fragmentInstance, queue));
       }
+      FragInstanceDispatchResult failedResult = null;
       for (Future<FragInstanceDispatchResult> future : futures) {
+        // Make sure all executing tasks are finished to avoid concurrency 
issues
         FragInstanceDispatchResult result = future.get();
-        if (!result.isSuccessful()) {
-          return immediateFuture(result);
+        if (!result.isSuccessful() && failedResult == null) {
+          failedResult = result;
         }
       }
+      if (failedResult != null) {
+        return immediateFuture(failedResult);
+      }
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       LOGGER.warn("Interrupted when dispatching read async", e);

Reply via email to