This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 7cffde507d6 Optimize load partition routing (#17863) (#17884)
7cffde507d6 is described below
commit 7cffde507d6e81dd0d5af385c645d60093ce97f8
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 9 18:18:02 2026 +0800
Optimize load partition routing (#17863) (#17884)
---
.../plan/analyze/ClusterPartitionFetcher.java | 38 +++++------
.../plan/node/load/LoadSingleTsFileNode.java | 40 +++++++-----
.../plan/scheduler/load/LoadTsFileScheduler.java | 74 +++++++++++++---------
3 files changed, 88 insertions(+), 64 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
index eb65666035a..6b28dbda347 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
@@ -258,26 +258,28 @@ public class ClusterPartitionFetcher implements
IPartitionFetcher {
dataPartitionQueryParams, config.isAutoCreateSchemaEnabled(),
userName);
DataPartition dataPartition =
partitionCache.getDataPartition(splitDataPartitionQueryParams);
- if (null == dataPartition) {
- try (ConfigNodeClient client =
-
configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
- TDataPartitionReq req =
constructDataPartitionReq(splitDataPartitionQueryParams);
- TDataPartitionTableResp dataPartitionTableResp =
client.getOrCreateDataPartitionTable(req);
+ if (null != dataPartition) {
+ return dataPartition;
+ }
- if (dataPartitionTableResp.getStatus().getCode()
- == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- dataPartition = parseDataPartitionResp(dataPartitionTableResp);
-
partitionCache.updateDataPartitionCache(dataPartitionTableResp.getDataPartitionTable());
- } else {
- throw new RuntimeException(
- new IoTDBException(
- dataPartitionTableResp.getStatus().getMessage(),
- dataPartitionTableResp.getStatus().getCode()));
- }
- } catch (ClientManagerException | TException e) {
- throw new StatementAnalyzeException(
- "An error occurred when executing getOrCreateDataPartition():" +
e.getMessage());
+ try (ConfigNodeClient client =
+ configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+ TDataPartitionReq req =
constructDataPartitionReq(splitDataPartitionQueryParams);
+ TDataPartitionTableResp dataPartitionTableResp =
client.getOrCreateDataPartitionTable(req);
+
+ if (dataPartitionTableResp.getStatus().getCode()
+ == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ dataPartition = parseDataPartitionResp(dataPartitionTableResp);
+
partitionCache.updateDataPartitionCache(dataPartitionTableResp.getDataPartitionTable());
+ } else {
+ throw new RuntimeException(
+ new IoTDBException(
+ dataPartitionTableResp.getStatus().getMessage(),
+ dataPartitionTableResp.getStatus().getCode()));
}
+ } catch (ClientManagerException | TException e) {
+ throw new StatementAnalyzeException(
+ "An error occurred when executing getOrCreateDataPartition():" +
e.getMessage());
}
return dataPartition;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
index 64b4655be28..a168228deb3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
@@ -79,27 +79,35 @@ public class LoadSingleTsFileNode extends WritePlanNode {
public boolean needDecodeTsFile(
Function<List<Pair<IDeviceID, TTimePartitionSlot>>,
List<TRegionReplicaSet>>
partitionFetcher) {
- List<Pair<IDeviceID, TTimePartitionSlot>> slotList = new ArrayList<>();
- resource
- .getDevices()
- .forEach(
- o -> {
- // iterating the index, must present
- slotList.add(
- new Pair<>(
- o,
TimePartitionUtils.getTimePartitionSlot(resource.getStartTime(o).get())));
- slotList.add(
- new Pair<>(
- o,
TimePartitionUtils.getTimePartitionSlot(resource.getEndTime(o).get())));
- });
+ if (needDecodeTsFile) {
+ return true;
+ }
+
+ List<Pair<IDeviceID, TTimePartitionSlot>> slotList =
+ new ArrayList<>(resource.getDevices().size() << 1);
+ for (final IDeviceID device : resource.getDevices()) {
+ // iterating the index, must present
+ final TTimePartitionSlot startSlot =
+
TimePartitionUtils.getTimePartitionSlot(resource.getStartTime(device).get());
+ final TTimePartitionSlot endSlot =
+
TimePartitionUtils.getTimePartitionSlot(resource.getEndTime(device).get());
+ slotList.add(new Pair<>(device, startSlot));
+ if (!startSlot.equals(endSlot)) {
+ slotList.add(new Pair<>(device, endSlot));
+ }
+ }
if (slotList.isEmpty()) {
throw new IllegalStateException(
String.format("Devices in TsFile %s is empty, this should not happen
here.", tsFile));
- } else if (slotList.stream()
- .anyMatch(slotPair ->
!slotPair.getRight().equals(slotList.get(0).right))) {
- needDecodeTsFile = true;
} else {
+ final TTimePartitionSlot firstSlot = slotList.get(0).right;
+ for (int i = 1, size = slotList.size(); i < size; i++) {
+ if (!slotList.get(i).right.equals(firstSlot)) {
+ needDecodeTsFile = true;
+ return true;
+ }
+ }
needDecodeTsFile = !isDispatchedToLocal(new
HashSet<>(partitionFetcher.apply(slotList)));
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 5cafe3b8384..e61367ed896 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -709,18 +709,29 @@ public class LoadTsFileScheduler implements IScheduler {
return;
}
+ final List<Pair<IDeviceID, TTimePartitionSlot>> partitionSlotList = new
ArrayList<>();
+ final int[] chunkPartitionIndexes = new
int[nonDirectionalChunkData.size()];
+ final Map<IDeviceID, Map<TTimePartitionSlot, Integer>>
partitionSlotIndexes = new HashMap<>();
+ for (int i = 0, size = nonDirectionalChunkData.size(); i < size; i++) {
+ final ChunkData chunkData = nonDirectionalChunkData.get(i);
+ final IDeviceID device = new PlainDeviceID(chunkData.getDevice());
+ final TTimePartitionSlot timePartitionSlot =
chunkData.getTimePartitionSlot();
+ final Map<TTimePartitionSlot, Integer> slotIndexes =
+ partitionSlotIndexes.computeIfAbsent(device, key -> new
HashMap<>());
+ Integer partitionSlotIndex = slotIndexes.get(timePartitionSlot);
+ if (partitionSlotIndex == null) {
+ partitionSlotIndex = partitionSlotList.size();
+ slotIndexes.put(timePartitionSlot, partitionSlotIndex);
+ partitionSlotList.add(new Pair<>(device, timePartitionSlot));
+ }
+ chunkPartitionIndexes[i] = partitionSlotIndex;
+ }
+
List<TRegionReplicaSet> replicaSets =
scheduler.partitionFetcher.queryDataPartition(
- nonDirectionalChunkData.stream()
- .map(
- data ->
- new Pair<>(
- (IDeviceID) new PlainDeviceID(data.getDevice()),
- data.getTimePartitionSlot()))
- .collect(Collectors.toList()),
- scheduler.queryContext.getSession().getUserName());
- for (int i = 0; i < replicaSets.size(); i++) {
- final TRegionReplicaSet replicaSet = replicaSets.get(i);
+ partitionSlotList,
scheduler.queryContext.getSession().getUserName());
+ for (int i = 0, size = nonDirectionalChunkData.size(); i < size; i++) {
+ final TRegionReplicaSet replicaSet =
replicaSets.get(chunkPartitionIndexes[i]);
final TConsensusGroupId regionId = replicaSet.getRegionId();
if (regionId2ReplicaSetAndNode.containsKey(regionId)
&&
!Objects.equals(regionId2ReplicaSetAndNode.get(regionId).getLeft(),
replicaSet)) {
@@ -790,7 +801,7 @@ public class LoadTsFileScheduler implements IScheduler {
public List<TRegionReplicaSet> queryDataPartition(
List<Pair<IDeviceID, TTimePartitionSlot>> slotList, String userName) {
- List<TRegionReplicaSet> replicaSets = new ArrayList<>();
+ List<TRegionReplicaSet> replicaSets = new ArrayList<>(slotList.size());
int size = slotList.size();
for (int i = 0; i < size; i += TRANSMIT_LIMIT) {
@@ -798,31 +809,34 @@ public class LoadTsFileScheduler implements IScheduler {
slotList.subList(i, Math.min(size, i + TRANSMIT_LIMIT));
DataPartition dataPartition =
fetcher.getOrCreateDataPartition(toQueryParam(subSlotList),
userName);
- replicaSets.addAll(
- subSlotList.stream()
- .map(
- pair ->
- dataPartition.getDataRegionReplicaSetForWriting(
- ((PlainDeviceID) pair.left).toStringID(),
pair.right))
- .collect(Collectors.toList()));
+ for (final Pair<IDeviceID, TTimePartitionSlot> pair : subSlotList) {
+ replicaSets.add(
+ dataPartition.getDataRegionReplicaSetForWriting(
+ ((PlainDeviceID) pair.left).toStringID(), pair.right));
+ }
}
return replicaSets;
}
private List<DataPartitionQueryParam> toQueryParam(
List<Pair<IDeviceID, TTimePartitionSlot>> slots) {
- return slots.stream()
- .collect(
- Collectors.groupingBy(
- Pair::getLeft, Collectors.mapping(Pair::getRight,
Collectors.toSet())))
- .entrySet()
- .stream()
- .map(
- entry ->
- new DataPartitionQueryParam(
- ((PlainDeviceID) entry.getKey()).toStringID(),
- new ArrayList<>(entry.getValue())))
- .collect(Collectors.toList());
+ final Map<IDeviceID, Set<TTimePartitionSlot>> device2TimePartitionSlots
= new HashMap<>();
+ for (final Pair<IDeviceID, TTimePartitionSlot> slot : slots) {
+ device2TimePartitionSlots
+ .computeIfAbsent(slot.left, key -> new HashSet<>())
+ .add(slot.right);
+ }
+
+ final List<DataPartitionQueryParam> queryParams =
+ new ArrayList<>(device2TimePartitionSlots.size());
+ for (final Map.Entry<IDeviceID, Set<TTimePartitionSlot>> entry :
+ device2TimePartitionSlots.entrySet()) {
+ final DataPartitionQueryParam queryParam =
+ new DataPartitionQueryParam(
+ ((PlainDeviceID) entry.getKey()).toStringID(), new
ArrayList<>(entry.getValue()));
+ queryParams.add(queryParam);
+ }
+ return queryParams;
}
}
}