This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch ty/InnerTimeJoin
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ty/InnerTimeJoin by this push:
new 48684b7cf2a skip useless time partition
48684b7cf2a is described below
commit 48684b7cf2ac14fcc87928e815eb33a3ad3cb471
Author: JackieTien97 <[email protected]>
AuthorDate: Thu Jan 4 12:17:53 2024 +0800
skip useless time partition
---
.../protocol/thrift/impl/ClientRPCServiceImpl.java | 5 +-
.../db/queryengine/plan/analyze/Analysis.java | 4 +-
.../plan/planner/distribution/SourceRewriter.java | 79 +++-------------------
.../planner/plan/node/write/DeleteDataNode.java | 2 +-
.../iotdb/commons/partition/DataPartition.java | 3 +-
5 files changed, 18 insertions(+), 75 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index 82ba4a3ed18..645159b75ae 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -789,7 +789,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
partitionFetcher.getDataPartitionWithUnclosedTimeRange(
Collections.singletonMap(db,
Collections.singletonList(queryParam)));
List<TRegionReplicaSet> regionReplicaSets =
- dataPartition.getDataRegionReplicaSet(deviceId, null);
+ dataPartition.getDataRegionReplicaSetWithTimeFilter(deviceId, null);
// no valid DataRegion
if (regionReplicaSets.isEmpty()
@@ -996,7 +996,8 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
sgNameToQueryParamsMap.put(database,
Collections.singletonList(queryParam));
DataPartition dataPartition =
partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
List<DataRegion> dataRegionList = new ArrayList<>();
- List<TRegionReplicaSet> replicaSets =
dataPartition.getDataRegionReplicaSet(deviceId, null);
+ List<TRegionReplicaSet> replicaSets =
+ dataPartition.getDataRegionReplicaSetWithTimeFilter(deviceId, null);
for (TRegionReplicaSet region : replicaSets) {
dataRegionList.add(
StorageEngine.getInstance()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
index 635ba9119b3..86e58e33f17 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
@@ -294,7 +294,7 @@ public class Analysis {
}
public List<TRegionReplicaSet> getPartitionInfo(PartialPath seriesPath,
Filter timefilter) {
- return dataPartition.getDataRegionReplicaSet(seriesPath.getDevice(),
timefilter);
+ return
dataPartition.getDataRegionReplicaSetWithTimeFilter(seriesPath.getDevice(),
timefilter);
}
public TRegionReplicaSet getPartitionInfo(
@@ -312,7 +312,7 @@ public class Analysis {
}
public List<TRegionReplicaSet> getPartitionInfo(String deviceName, Filter
globalTimeFilter) {
- return dataPartition.getDataRegionReplicaSet(deviceName, globalTimeFilter);
+ return dataPartition.getDataRegionReplicaSetWithTimeFilter(deviceName,
globalTimeFilter);
}
public Statement getStatement() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
index f27faab7d7d..e8e3563acc4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
@@ -83,6 +83,7 @@ import java.util.TreeSet;
import java.util.stream.Collectors;
import static
org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+import static org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED;
public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanContext> {
@@ -739,7 +740,7 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
return Collections.emptyList();
}
List<List<TTimePartitionSlot>> res = new
ArrayList<>(childTimePartitionList.get(0));
- for (int i = 1, size = childTimePartitionList.size(); i < size; i++) {
+ for (int i = 1, size = childTimePartitionList.size(); i < size &&
!res.isEmpty(); i++) {
res = combineTwoTimePartitionList(res, childTimePartitionList.get(i));
}
return res;
@@ -753,38 +754,6 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
int rightSize = right.size();
List<List<TTimePartitionSlot>> res = new ArrayList<>(Math.max(leftSize,
rightSize));
- // common case, one SeriesSlot only belongs to one data region
- if (leftSize == 1 && rightSize == 1) {
- List<TTimePartitionSlot> list = new ArrayList<>();
- List<TTimePartitionSlot> left0 = left.get(0);
- List<TTimePartitionSlot> right0 = left.get(0);
-
- int left0Index = 0;
- int left0Size = left0.size();
- int right0Index = 0;
- int right0Size = right0.size();
- while (left0Index < left0Size && right0Index < right0Size) {
- if (left0.get(left0Index).startTime ==
right0.get(right0Index).startTime) {
- list.add(left0.get(left0Index));
- left0Index++;
- right0Index++;
- } else if (left0.get(left0Index).startTime <
right0.get(right0Index).startTime) {
- list.add(left0.get(left0Index));
- left0Index++;
- } else {
- list.add(right0.get(left0Index));
- right0Index++;
- }
- }
- if (left0Index < left0Size) {
- list.addAll(left0Index, left0);
- }
- if (right0Index < right0Size) {
- list.addAll(right0Index, right0);
- }
- res.add(list);
- return res;
- }
int previousResIndex = 0;
res.add(new ArrayList<>());
@@ -797,6 +766,7 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
int rightCurrentListSize = rightCurrentList.size();
while (leftCurrentListIndex < leftCurrentListSize
&& rightCurrentListIndex < rightCurrentListSize) {
+ // only keep time partition in All SeriesSlot
if (leftCurrentList.get(leftCurrentListIndex).startTime
== rightCurrentList.get(rightCurrentListIndex).startTime) {
// new continuous time range
@@ -810,20 +780,8 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
rightCurrentListIndex++;
} else if (leftCurrentList.get(leftCurrentListIndex).startTime
< rightCurrentList.get(rightCurrentListIndex).startTime) {
- // new continuous time range
- if (leftCurrentListIndex == 0 && leftIndex != 0) {
- previousResIndex++;
- res.add(new ArrayList<>());
- }
-
res.get(previousResIndex).add(leftCurrentList.get(leftCurrentListIndex));
leftCurrentListIndex++;
} else {
- // new continuous time range
- if (rightCurrentListIndex == 0 && rightIndex != 0) {
- previousResIndex++;
- res.add(new ArrayList<>());
- }
-
res.get(previousResIndex).add(rightCurrentList.get(rightCurrentListIndex));
rightCurrentListIndex++;
}
}
@@ -836,30 +794,6 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
rightCurrentListIndex = 0;
}
}
-
- if (leftIndex == leftSize) {
- while (rightIndex < rightSize) {
- if (rightCurrentListIndex == 0 && rightIndex != 0) {
- previousResIndex++;
- res.add(new ArrayList<>());
- }
- res.get(previousResIndex).addAll(rightCurrentListIndex,
right.get(rightIndex));
- rightIndex++;
- rightCurrentListIndex = 0;
- }
- }
-
- if (rightIndex == rightSize) {
- while (leftIndex < leftSize) {
- if (leftCurrentListIndex == 0 && leftIndex != 0) {
- previousResIndex++;
- res.add(new ArrayList<>());
- }
- res.get(previousResIndex).addAll(leftCurrentListIndex,
left.get(leftIndex));
- leftIndex++;
- leftCurrentListIndex = 0;
- }
- }
return res;
}
@@ -905,6 +839,13 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
}
}
}
+
+ if (subInnerJoinNode.isEmpty()) {
+ for (SeriesSourceNode sourceNode : seriesScanNodes) {
+ sourceNode.setRegionReplicaSet(NOT_ASSIGNED);
+ }
+ return Collections.singletonList(node);
+ }
return subInnerJoinNode;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
index 022ba66087a..a72d8c4349c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
@@ -294,7 +294,7 @@ public class DeleteDataNode extends WritePlanNode
implements WALEntryValue {
PartialPath devicePath = deviceSchemaInfo.getDevicePath();
// regionId is null when data region of devicePath not existed
dataPartition
- .getDataRegionReplicaSet(
+ .getDataRegionReplicaSetWithTimeFilter(
devicePath.getFullPath(), TimeFilterApi.between(deleteStartTime,
deleteEndTime))
.stream()
.filter(regionReplicaSet -> regionReplicaSet.getRegionId() != null)
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
index 07927bb4f24..66489604d53 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
@@ -116,7 +116,8 @@ public class DataPartition extends Partition {
return res;
}
- public List<TRegionReplicaSet> getDataRegionReplicaSet(String deviceName,
Filter timeFilter) {
+ public List<TRegionReplicaSet> getDataRegionReplicaSetWithTimeFilter(
+ String deviceName, Filter timeFilter) {
String storageGroup = getStorageGroupByDevice(deviceName);
TSeriesPartitionSlot seriesPartitionSlot =
calculateDeviceGroupId(deviceName);
if (!dataPartitionMap.containsKey(storageGroup)