This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/2.0.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1a13919d5960ee4020907c4129782976231bb2f3 Author: shuwenwei <[email protected]> AuthorDate: Tue Jul 29 20:48:38 2025 +0800 Fix concurrent modification of non-thread-safe data structures caused by parallel dispatching (cherry picked from commit ee9d7d1b04e080ec08f356ec1949f717e882604c) --- .../apache/iotdb/db/queryengine/common/MPPQueryContext.java | 10 +++++----- .../plan/scheduler/FragmentInstanceDispatcherImpl.java | 9 +++++++-- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java index b4bd87b2e7e..d0b7a072b54 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java @@ -34,10 +34,9 @@ import org.apache.tsfile.read.filter.basic.Filter; import java.time.ZoneId; import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; /** * This class is used to record the context of a query including QueryId, query statement, session @@ -66,7 +65,8 @@ public class MPPQueryContext { // When some DataNode cannot be connected, its endPoint will be put // in this list. And the following retry will avoid planning fragment // onto this node. - private final List<TEndPoint> endPointBlackList; + // When dispatch FI fails, this structure may be modified concurrently + private final Set<TEndPoint> endPointBlackList; private final TypeProvider typeProvider = new TypeProvider(); @@ -93,7 +93,7 @@ public class MPPQueryContext { public MPPQueryContext(QueryId queryId) { this.queryId = queryId; - this.endPointBlackList = new LinkedList<>(); + this.endPointBlackList = ConcurrentHashMap.newKeySet(); this.memoryReservationManager = new NotThreadSafeMemoryReservationManager(queryId, this.getClass().getName()); } @@ -194,7 +194,7 @@ public class MPPQueryContext { this.endPointBlackList.add(endPoint); } - public List<TEndPoint> getEndPointBlackList() { + public Set<TEndPoint> getEndPointBlackList() { return endPointBlackList; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java index a4f21d2f20f..adeee8d41eb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java @@ -151,12 +151,17 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { instances.get(next.getPlanFragment().getIndexInFragmentInstanceList()); futures.add(asyncDispatchOneInstance(next, fragmentInstance, queue)); } + FragInstanceDispatchResult failedResult = null; for (Future<FragInstanceDispatchResult> future : futures) { + // Make sure all executing tasks are finished to avoid concurrency issues FragInstanceDispatchResult result = future.get(); - if (!result.isSuccessful()) { - return immediateFuture(result); + if (!result.isSuccessful() && failedResult == null) { + failedResult = result; } } + if (failedResult != null) { + return immediateFuture(failedResult); + } } catch (InterruptedException e) { Thread.currentThread().interrupt(); LOGGER.warn("Interrupted when dispatching read async", e);
