This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 7cffde507d6 Optimize load partition routing (#17863) (#17884)
7cffde507d6 is described below

commit 7cffde507d6e81dd0d5af385c645d60093ce97f8
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 9 18:18:02 2026 +0800

    Optimize load partition routing (#17863) (#17884)
---
 .../plan/analyze/ClusterPartitionFetcher.java      | 38 +++++------
 .../plan/node/load/LoadSingleTsFileNode.java       | 40 +++++++-----
 .../plan/scheduler/load/LoadTsFileScheduler.java   | 74 +++++++++++++---------
 3 files changed, 88 insertions(+), 64 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
index eb65666035a..6b28dbda347 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
@@ -258,26 +258,28 @@ public class ClusterPartitionFetcher implements 
IPartitionFetcher {
             dataPartitionQueryParams, config.isAutoCreateSchemaEnabled(), 
userName);
     DataPartition dataPartition = 
partitionCache.getDataPartition(splitDataPartitionQueryParams);
 
-    if (null == dataPartition) {
-      try (ConfigNodeClient client =
-          
configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
-        TDataPartitionReq req = 
constructDataPartitionReq(splitDataPartitionQueryParams);
-        TDataPartitionTableResp dataPartitionTableResp = 
client.getOrCreateDataPartitionTable(req);
+    if (null != dataPartition) {
+      return dataPartition;
+    }
 
-        if (dataPartitionTableResp.getStatus().getCode()
-            == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-          dataPartition = parseDataPartitionResp(dataPartitionTableResp);
-          
partitionCache.updateDataPartitionCache(dataPartitionTableResp.getDataPartitionTable());
-        } else {
-          throw new RuntimeException(
-              new IoTDBException(
-                  dataPartitionTableResp.getStatus().getMessage(),
-                  dataPartitionTableResp.getStatus().getCode()));
-        }
-      } catch (ClientManagerException | TException e) {
-        throw new StatementAnalyzeException(
-            "An error occurred when executing getOrCreateDataPartition():" + 
e.getMessage());
+    try (ConfigNodeClient client =
+        configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) 
{
+      TDataPartitionReq req = 
constructDataPartitionReq(splitDataPartitionQueryParams);
+      TDataPartitionTableResp dataPartitionTableResp = 
client.getOrCreateDataPartitionTable(req);
+
+      if (dataPartitionTableResp.getStatus().getCode()
+          == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        dataPartition = parseDataPartitionResp(dataPartitionTableResp);
+        
partitionCache.updateDataPartitionCache(dataPartitionTableResp.getDataPartitionTable());
+      } else {
+        throw new RuntimeException(
+            new IoTDBException(
+                dataPartitionTableResp.getStatus().getMessage(),
+                dataPartitionTableResp.getStatus().getCode()));
       }
+    } catch (ClientManagerException | TException e) {
+      throw new StatementAnalyzeException(
+          "An error occurred when executing getOrCreateDataPartition():" + 
e.getMessage());
     }
     return dataPartition;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
index 64b4655be28..a168228deb3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
@@ -79,27 +79,35 @@ public class LoadSingleTsFileNode extends WritePlanNode {
   public boolean needDecodeTsFile(
       Function<List<Pair<IDeviceID, TTimePartitionSlot>>, 
List<TRegionReplicaSet>>
           partitionFetcher) {
-    List<Pair<IDeviceID, TTimePartitionSlot>> slotList = new ArrayList<>();
-    resource
-        .getDevices()
-        .forEach(
-            o -> {
-              // iterating the index, must present
-              slotList.add(
-                  new Pair<>(
-                      o, 
TimePartitionUtils.getTimePartitionSlot(resource.getStartTime(o).get())));
-              slotList.add(
-                  new Pair<>(
-                      o, 
TimePartitionUtils.getTimePartitionSlot(resource.getEndTime(o).get())));
-            });
+    if (needDecodeTsFile) {
+      return true;
+    }
+
+    List<Pair<IDeviceID, TTimePartitionSlot>> slotList =
+        new ArrayList<>(resource.getDevices().size() << 1);
+    for (final IDeviceID device : resource.getDevices()) {
+      // iterating the index, must present
+      final TTimePartitionSlot startSlot =
+          
TimePartitionUtils.getTimePartitionSlot(resource.getStartTime(device).get());
+      final TTimePartitionSlot endSlot =
+          
TimePartitionUtils.getTimePartitionSlot(resource.getEndTime(device).get());
+      slotList.add(new Pair<>(device, startSlot));
+      if (!startSlot.equals(endSlot)) {
+        slotList.add(new Pair<>(device, endSlot));
+      }
+    }
 
     if (slotList.isEmpty()) {
       throw new IllegalStateException(
           String.format("Devices in TsFile %s is empty, this should not happen 
here.", tsFile));
-    } else if (slotList.stream()
-        .anyMatch(slotPair -> 
!slotPair.getRight().equals(slotList.get(0).right))) {
-      needDecodeTsFile = true;
     } else {
+      final TTimePartitionSlot firstSlot = slotList.get(0).right;
+      for (int i = 1, size = slotList.size(); i < size; i++) {
+        if (!slotList.get(i).right.equals(firstSlot)) {
+          needDecodeTsFile = true;
+          return true;
+        }
+      }
       needDecodeTsFile = !isDispatchedToLocal(new 
HashSet<>(partitionFetcher.apply(slotList)));
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 5cafe3b8384..e61367ed896 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -709,18 +709,29 @@ public class LoadTsFileScheduler implements IScheduler {
         return;
       }
 
+      final List<Pair<IDeviceID, TTimePartitionSlot>> partitionSlotList = new 
ArrayList<>();
+      final int[] chunkPartitionIndexes = new 
int[nonDirectionalChunkData.size()];
+      final Map<IDeviceID, Map<TTimePartitionSlot, Integer>> 
partitionSlotIndexes = new HashMap<>();
+      for (int i = 0, size = nonDirectionalChunkData.size(); i < size; i++) {
+        final ChunkData chunkData = nonDirectionalChunkData.get(i);
+        final IDeviceID device = new PlainDeviceID(chunkData.getDevice());
+        final TTimePartitionSlot timePartitionSlot = 
chunkData.getTimePartitionSlot();
+        final Map<TTimePartitionSlot, Integer> slotIndexes =
+            partitionSlotIndexes.computeIfAbsent(device, key -> new 
HashMap<>());
+        Integer partitionSlotIndex = slotIndexes.get(timePartitionSlot);
+        if (partitionSlotIndex == null) {
+          partitionSlotIndex = partitionSlotList.size();
+          slotIndexes.put(timePartitionSlot, partitionSlotIndex);
+          partitionSlotList.add(new Pair<>(device, timePartitionSlot));
+        }
+        chunkPartitionIndexes[i] = partitionSlotIndex;
+      }
+
       List<TRegionReplicaSet> replicaSets =
           scheduler.partitionFetcher.queryDataPartition(
-              nonDirectionalChunkData.stream()
-                  .map(
-                      data ->
-                          new Pair<>(
-                              (IDeviceID) new PlainDeviceID(data.getDevice()),
-                              data.getTimePartitionSlot()))
-                  .collect(Collectors.toList()),
-              scheduler.queryContext.getSession().getUserName());
-      for (int i = 0; i < replicaSets.size(); i++) {
-        final TRegionReplicaSet replicaSet = replicaSets.get(i);
+              partitionSlotList, 
scheduler.queryContext.getSession().getUserName());
+      for (int i = 0, size = nonDirectionalChunkData.size(); i < size; i++) {
+        final TRegionReplicaSet replicaSet = 
replicaSets.get(chunkPartitionIndexes[i]);
         final TConsensusGroupId regionId = replicaSet.getRegionId();
         if (regionId2ReplicaSetAndNode.containsKey(regionId)
             && 
!Objects.equals(regionId2ReplicaSetAndNode.get(regionId).getLeft(), 
replicaSet)) {
@@ -790,7 +801,7 @@ public class LoadTsFileScheduler implements IScheduler {
 
     public List<TRegionReplicaSet> queryDataPartition(
         List<Pair<IDeviceID, TTimePartitionSlot>> slotList, String userName) {
-      List<TRegionReplicaSet> replicaSets = new ArrayList<>();
+      List<TRegionReplicaSet> replicaSets = new ArrayList<>(slotList.size());
       int size = slotList.size();
 
       for (int i = 0; i < size; i += TRANSMIT_LIMIT) {
@@ -798,31 +809,34 @@ public class LoadTsFileScheduler implements IScheduler {
             slotList.subList(i, Math.min(size, i + TRANSMIT_LIMIT));
         DataPartition dataPartition =
             fetcher.getOrCreateDataPartition(toQueryParam(subSlotList), 
userName);
-        replicaSets.addAll(
-            subSlotList.stream()
-                .map(
-                    pair ->
-                        dataPartition.getDataRegionReplicaSetForWriting(
-                            ((PlainDeviceID) pair.left).toStringID(), 
pair.right))
-                .collect(Collectors.toList()));
+        for (final Pair<IDeviceID, TTimePartitionSlot> pair : subSlotList) {
+          replicaSets.add(
+              dataPartition.getDataRegionReplicaSetForWriting(
+                  ((PlainDeviceID) pair.left).toStringID(), pair.right));
+        }
       }
       return replicaSets;
     }
 
     private List<DataPartitionQueryParam> toQueryParam(
         List<Pair<IDeviceID, TTimePartitionSlot>> slots) {
-      return slots.stream()
-          .collect(
-              Collectors.groupingBy(
-                  Pair::getLeft, Collectors.mapping(Pair::getRight, 
Collectors.toSet())))
-          .entrySet()
-          .stream()
-          .map(
-              entry ->
-                  new DataPartitionQueryParam(
-                      ((PlainDeviceID) entry.getKey()).toStringID(),
-                      new ArrayList<>(entry.getValue())))
-          .collect(Collectors.toList());
+      final Map<IDeviceID, Set<TTimePartitionSlot>> device2TimePartitionSlots 
= new HashMap<>();
+      for (final Pair<IDeviceID, TTimePartitionSlot> slot : slots) {
+        device2TimePartitionSlots
+            .computeIfAbsent(slot.left, key -> new HashSet<>())
+            .add(slot.right);
+      }
+
+      final List<DataPartitionQueryParam> queryParams =
+          new ArrayList<>(device2TimePartitionSlots.size());
+      for (final Map.Entry<IDeviceID, Set<TTimePartitionSlot>> entry :
+          device2TimePartitionSlots.entrySet()) {
+        final DataPartitionQueryParam queryParam =
+            new DataPartitionQueryParam(
+                ((PlainDeviceID) entry.getKey()).toStringID(), new 
ArrayList<>(entry.getValue()));
+        queryParams.add(queryParam);
+      }
+      return queryParams;
     }
   }
 }

Reply via email to