This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4d55d5121df Optimize load partition routing (#17863)
4d55d5121df is described below
commit 4d55d5121df3ffab6fef8931daae026b2b046e89
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 9 12:03:12 2026 +0800
Optimize load partition routing (#17863)
---
.../plan/analyze/ClusterPartitionFetcher.java | 38 +++++-----
.../plan/node/load/LoadSingleTsFileNode.java | 36 +++++----
.../plan/scheduler/load/LoadTsFileScheduler.java | 88 +++++++++++++---------
3 files changed, 90 insertions(+), 72 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
index 2ccbf0522e8..cba3f22773f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
@@ -273,28 +273,28 @@ public class ClusterPartitionFetcher implements
IPartitionFetcher {
@Override
public DataPartition getOrCreateDataPartition(
final List<DataPartitionQueryParam> dataPartitionQueryParams, final
String userName) {
- DataPartition dataPartition;
+ final Map<String, List<DataPartitionQueryParam>>
splitDataPartitionQueryParams =
+ splitDataPartitionQueryParam(
+ dataPartitionQueryParams, config.isAutoCreateSchemaEnabled(),
userName);
+ DataPartition dataPartition =
partitionCache.getDataPartition(splitDataPartitionQueryParams);
+ if (null != dataPartition) {
+ return dataPartition;
+ }
+
try (final ConfigNodeClient client =
configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
- final Map<String, List<DataPartitionQueryParam>>
splitDataPartitionQueryParams =
- splitDataPartitionQueryParam(
- dataPartitionQueryParams, config.isAutoCreateSchemaEnabled(),
userName);
- dataPartition =
partitionCache.getDataPartition(splitDataPartitionQueryParams);
-
- if (null == dataPartition) {
- final TDataPartitionReq req =
constructDataPartitionReq(splitDataPartitionQueryParams);
- final TDataPartitionTableResp dataPartitionTableResp =
- client.getOrCreateDataPartitionTable(req);
+ final TDataPartitionReq req =
constructDataPartitionReq(splitDataPartitionQueryParams);
+ final TDataPartitionTableResp dataPartitionTableResp =
+ client.getOrCreateDataPartitionTable(req);
- if (dataPartitionTableResp.getStatus().getCode()
- == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- dataPartition = parseDataPartitionResp(dataPartitionTableResp);
-
partitionCache.updateDataPartitionCache(dataPartitionTableResp.getDataPartitionTable());
- } else {
- throw new IoTDBRuntimeException(
- dataPartitionTableResp.getStatus().getMessage(),
- dataPartitionTableResp.getStatus().getCode());
- }
+ if (dataPartitionTableResp.getStatus().getCode()
+ == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ dataPartition = parseDataPartitionResp(dataPartitionTableResp);
+
partitionCache.updateDataPartitionCache(dataPartitionTableResp.getDataPartitionTable());
+ } else {
+ throw new IoTDBRuntimeException(
+ dataPartitionTableResp.getStatus().getMessage(),
+ dataPartitionTableResp.getStatus().getCode());
}
} catch (final ClientManagerException | TException e) {
throw new StatementAnalyzeException(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
index 098b9fde361..be29fabe367 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
@@ -96,27 +96,31 @@ public class LoadSingleTsFileNode extends WritePlanNode {
return true;
}
- List<Pair<IDeviceID, TTimePartitionSlot>> slotList = new ArrayList<>();
- resource
- .getDevices()
- .forEach(
- o -> {
- // iterating the index, must present
- slotList.add(
- new Pair<>(
- o,
TimePartitionUtils.getTimePartitionSlot(resource.getStartTime(o).get())));
- slotList.add(
- new Pair<>(
- o,
TimePartitionUtils.getTimePartitionSlot(resource.getEndTime(o).get())));
- });
+ List<Pair<IDeviceID, TTimePartitionSlot>> slotList =
+ new ArrayList<>(resource.getDevices().size() << 1);
+ for (final IDeviceID device : resource.getDevices()) {
+ // iterating the index, must present
+ final TTimePartitionSlot startSlot =
+
TimePartitionUtils.getTimePartitionSlot(resource.getStartTime(device).get());
+ final TTimePartitionSlot endSlot =
+
TimePartitionUtils.getTimePartitionSlot(resource.getEndTime(device).get());
+ slotList.add(new Pair<>(device, startSlot));
+ if (!startSlot.equals(endSlot)) {
+ slotList.add(new Pair<>(device, endSlot));
+ }
+ }
if (slotList.isEmpty()) {
throw new IllegalStateException(
String.format("Devices in TsFile %s is empty, this should not happen
here.", tsFile));
- } else if (slotList.stream()
- .anyMatch(slotPair ->
!slotPair.getRight().equals(slotList.get(0).right))) {
- needDecodeTsFile = true;
} else {
+ final TTimePartitionSlot firstSlot = slotList.get(0).right;
+ for (int i = 1, size = slotList.size(); i < size; i++) {
+ if (!slotList.get(i).right.equals(firstSlot)) {
+ needDecodeTsFile = true;
+ return true;
+ }
+ }
needDecodeTsFile = !isDispatchedToLocal(new
HashSet<>(partitionFetcher.apply(slotList)));
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 300b8f0eece..6ca3164a3e1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -782,14 +782,29 @@ public class LoadTsFileScheduler implements IScheduler {
return;
}
+ final List<Pair<IDeviceID, TTimePartitionSlot>> partitionSlotList = new
ArrayList<>();
+ final int[] chunkPartitionIndexes = new
int[nonDirectionalChunkData.size()];
+ final Map<IDeviceID, Map<TTimePartitionSlot, Integer>>
partitionSlotIndexes = new HashMap<>();
+ for (int i = 0, size = nonDirectionalChunkData.size(); i < size; i++) {
+ final ChunkData chunkData = nonDirectionalChunkData.get(i);
+ final IDeviceID device = chunkData.getDevice();
+ final TTimePartitionSlot timePartitionSlot =
chunkData.getTimePartitionSlot();
+ final Map<TTimePartitionSlot, Integer> slotIndexes =
+ partitionSlotIndexes.computeIfAbsent(device, key -> new
HashMap<>());
+ Integer partitionSlotIndex = slotIndexes.get(timePartitionSlot);
+ if (partitionSlotIndex == null) {
+ partitionSlotIndex = partitionSlotList.size();
+ slotIndexes.put(timePartitionSlot, partitionSlotIndex);
+ partitionSlotList.add(new Pair<>(device, timePartitionSlot));
+ }
+ chunkPartitionIndexes[i] = partitionSlotIndex;
+ }
+
List<TRegionReplicaSet> replicaSets =
scheduler.partitionFetcher.queryDataPartition(
- nonDirectionalChunkData.stream()
- .map(data -> new Pair<>(data.getDevice(),
data.getTimePartitionSlot()))
- .collect(Collectors.toList()),
- scheduler.queryContext.getSession().getUserName());
- for (int i = 0; i < replicaSets.size(); i++) {
- final TRegionReplicaSet replicaSet = replicaSets.get(i);
+ partitionSlotList,
scheduler.queryContext.getSession().getUserName());
+ for (int i = 0, size = nonDirectionalChunkData.size(); i < size; i++) {
+ final TRegionReplicaSet replicaSet =
replicaSets.get(chunkPartitionIndexes[i]);
final TConsensusGroupId regionId = replicaSet.getRegionId();
if (regionId2ReplicaSetAndNode.containsKey(regionId)
&&
!Objects.equals(regionId2ReplicaSetAndNode.get(regionId).getLeft(),
replicaSet)) {
@@ -864,7 +879,7 @@ public class LoadTsFileScheduler implements IScheduler {
public List<TRegionReplicaSet> queryDataPartition(
List<Pair<IDeviceID, TTimePartitionSlot>> slotList, String userName) {
- List<TRegionReplicaSet> replicaSets = new ArrayList<>();
+ List<TRegionReplicaSet> replicaSets = new ArrayList<>(slotList.size());
int size = slotList.size();
for (int i = 0; i < size; i += TRANSMIT_LIMIT) {
@@ -872,42 +887,41 @@ public class LoadTsFileScheduler implements IScheduler {
slotList.subList(i, Math.min(size, i + TRANSMIT_LIMIT));
DataPartition dataPartition =
fetcher.getOrCreateDataPartition(toQueryParam(subSlotList),
userName);
- replicaSets.addAll(
- subSlotList.stream()
- .map(
- pair ->
- // database is an explicit database hint for
table-model loads and
- // pipe-generated tree-model loads.
- database != null
- ? dataPartition.getDataRegionReplicaSetForWriting(
- pair.left, pair.right, database)
- : dataPartition.getDataRegionReplicaSetForWriting(
- pair.left, pair.right))
- .collect(Collectors.toList()));
+ for (final Pair<IDeviceID, TTimePartitionSlot> pair : subSlotList) {
+ // database is an explicit database hint for table-model loads and
+ // pipe-generated tree-model loads.
+ replicaSets.add(
+ database != null
+ ? dataPartition.getDataRegionReplicaSetForWriting(pair.left,
pair.right, database)
+ : dataPartition.getDataRegionReplicaSetForWriting(pair.left,
pair.right));
+ }
}
return replicaSets;
}
private List<DataPartitionQueryParam> toQueryParam(
List<Pair<IDeviceID, TTimePartitionSlot>> slots) {
- return slots.stream()
- .collect(
- Collectors.groupingBy(
- Pair::getLeft, Collectors.mapping(Pair::getRight,
Collectors.toSet())))
- .entrySet()
- .stream()
- .map(
- entry -> {
- DataPartitionQueryParam queryParam =
- new DataPartitionQueryParam(entry.getKey(), new
ArrayList<>(entry.getValue()));
- // database is an explicit database hint for table-model loads
and
- // pipe-generated tree-model loads.
- if (database != null) {
- queryParam.setDatabaseName(database);
- }
- return queryParam;
- })
- .collect(Collectors.toList());
+ final Map<IDeviceID, Set<TTimePartitionSlot>> device2TimePartitionSlots
= new HashMap<>();
+ for (final Pair<IDeviceID, TTimePartitionSlot> slot : slots) {
+ device2TimePartitionSlots
+ .computeIfAbsent(slot.left, key -> new HashSet<>())
+ .add(slot.right);
+ }
+
+ final List<DataPartitionQueryParam> queryParams =
+ new ArrayList<>(device2TimePartitionSlots.size());
+ for (final Map.Entry<IDeviceID, Set<TTimePartitionSlot>> entry :
+ device2TimePartitionSlots.entrySet()) {
+ final DataPartitionQueryParam queryParam =
+ new DataPartitionQueryParam(entry.getKey(), new
ArrayList<>(entry.getValue()));
+ // database is an explicit database hint for table-model loads and
+ // pipe-generated tree-model loads.
+ if (database != null) {
+ queryParam.setDatabaseName(database);
+ }
+ queryParams.add(queryParam);
+ }
+ return queryParams;
}
}
}