This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new ef872f3856d Fix concurrent modification of non-thread-safe data 
structures caused by parallel dispatching
ef872f3856d is described below

commit ef872f3856d5c0286f4f18f8a95c01f6e9c637d6
Author: shuwenwei <[email protected]>
AuthorDate: Fri Aug 15 14:19:48 2025 +0800

    Fix concurrent modification of non-thread-safe data structures caused by 
parallel dispatching
---
 .../apache/iotdb/db/queryengine/common/MPPQueryContext.java    | 10 +++++-----
 .../plan/scheduler/FragmentInstanceDispatcherImpl.java         |  9 +++++++--
 2 files changed, 12 insertions(+), 7 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
index 0035f726c4d..fc457ab9ce4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
@@ -34,9 +34,8 @@ import org.apache.tsfile.read.filter.basic.Filter;
 
 import java.time.ZoneId;
 import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.LongConsumer;
 
 /**
@@ -66,7 +65,8 @@ public class MPPQueryContext {
   // When some DataNode cannot be connected, its endPoint will be put
   // in this list. And the following retry will avoid planning fragment
   // onto this node.
-  private final List<TEndPoint> endPointBlackList;
+  // When dispatch FI fails, this structure may be modified concurrently
+  private final Set<TEndPoint> endPointBlackList;
 
   private final TypeProvider typeProvider = new TypeProvider();
 
@@ -97,7 +97,7 @@ public class MPPQueryContext {
 
   public MPPQueryContext(QueryId queryId) {
     this.queryId = queryId;
-    this.endPointBlackList = new LinkedList<>();
+    this.endPointBlackList = ConcurrentHashMap.newKeySet();
     this.memoryReservationManager =
         new NotThreadSafeMemoryReservationManager(queryId, 
this.getClass().getName());
   }
@@ -222,7 +222,7 @@ public class MPPQueryContext {
     this.endPointBlackList.add(endPoint);
   }
 
-  public List<TEndPoint> getEndPointBlackList() {
+  public Set<TEndPoint> getEndPointBlackList() {
     return endPointBlackList;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index fce62782abc..162ccf22a75 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -146,12 +146,17 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
             
instances.get(next.getPlanFragment().getIndexInFragmentInstanceList());
         futures.add(asyncDispatchOneInstance(next, fragmentInstance, queue));
       }
+      FragInstanceDispatchResult failedResult = null;
       for (Future<FragInstanceDispatchResult> future : futures) {
+        // Make sure all executing tasks are finished to avoid concurrency 
issues
         FragInstanceDispatchResult result = future.get();
-        if (!result.isSuccessful()) {
-          return immediateFuture(result);
+        if (!result.isSuccessful() && failedResult == null) {
+          failedResult = result;
         }
       }
+      if (failedResult != null) {
+        return immediateFuture(failedResult);
+      }
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       LOGGER.warn("Interrupted when dispatching read async", e);

Reply via email to