This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.1 by this push:
new 17ba8737107 [To rel/1.1] Support show timeseries of system db in tail
(#10159)
17ba8737107 is described below
commit 17ba8737107083969b1e13ab017208b63f50efb4
Author: Marcos_Zyk <[email protected]>
AuthorDate: Thu Jun 15 14:32:49 2023 +0800
[To rel/1.1] Support show timeseries of system db in tail (#10159)
---
.../plan/planner/distribution/SourceRewriter.java | 122 +++++++++++++++------
1 file changed, 90 insertions(+), 32 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 16771c6ad16..57f57fa1b9a 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -65,6 +65,7 @@ import
org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
+import org.apache.iotdb.metrics.utils.IoTDBMetricsUtils;
import java.util.ArrayList;
import java.util.Collections;
@@ -270,6 +271,7 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
SchemaQueryMergeNode root = (SchemaQueryMergeNode) node.clone();
SchemaQueryScanNode seed = (SchemaQueryScanNode) node.getChildren().get(0);
List<PartialPath> pathPatternList = seed.getPathPatternList();
+ Set<TRegionReplicaSet> regionsOfSystemDatabase = new HashSet<>();
if (pathPatternList.size() == 1) {
// the path pattern overlaps with all storageGroup or storageGroup.**
TreeSet<TRegionReplicaSet> schemaRegions =
@@ -279,16 +281,33 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
.getSchemaPartitionMap()
.forEach(
(storageGroup, deviceGroup) -> {
- deviceGroup.forEach(
- (deviceGroupId, schemaRegionReplicaSet) ->
- schemaRegions.add(schemaRegionReplicaSet));
+ if (storageGroup.equals(IoTDBMetricsUtils.DATABASE)) {
+ deviceGroup.forEach(
+ (deviceGroupId, schemaRegionReplicaSet) ->
+ regionsOfSystemDatabase.add(schemaRegionReplicaSet));
+ } else {
+ deviceGroup.forEach(
+ (deviceGroupId, schemaRegionReplicaSet) ->
+ schemaRegions.add(schemaRegionReplicaSet));
+ }
});
schemaRegions.forEach(
region -> {
- SchemaQueryScanNode schemaQueryScanNode = (SchemaQueryScanNode)
seed.clone();
-
schemaQueryScanNode.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
- schemaQueryScanNode.setRegionReplicaSet(region);
- root.addChild(schemaQueryScanNode);
+ addSchemaSourceNode(
+ root,
+ seed.getPath(),
+ region,
+ context.queryContext.getQueryId().genPlanNodeId(),
+ seed);
+ });
+ regionsOfSystemDatabase.forEach(
+ region -> {
+ addSchemaSourceNode(
+ root,
+ seed.getPath(),
+ region,
+ context.queryContext.getQueryId().genPlanNodeId(),
+ seed);
});
} else {
// the path pattern may only overlap with part of storageGroup or
storageGroup.**, need filter
@@ -302,43 +321,82 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
.getSchemaPartitionMap()
.forEach(
(storageGroup, deviceGroup) -> {
- deviceGroup.forEach(
- (deviceGroupId, schemaRegionReplicaSet) ->
- storageGroupSchemaRegionMap
- .computeIfAbsent(storageGroup, k -> new
HashSet<>())
- .add(schemaRegionReplicaSet));
+ if (storageGroup.equals(IoTDBMetricsUtils.DATABASE)) {
+ deviceGroup.forEach(
+ (deviceGroupId, schemaRegionReplicaSet) ->
+ regionsOfSystemDatabase.add(schemaRegionReplicaSet));
+ } else {
+ deviceGroup.forEach(
+ (deviceGroupId, schemaRegionReplicaSet) ->
+ storageGroupSchemaRegionMap
+ .computeIfAbsent(storageGroup, k -> new
HashSet<>())
+ .add(schemaRegionReplicaSet));
+ }
});
storageGroupSchemaRegionMap.forEach(
(storageGroup, schemaRegionSet) -> {
- // extract the patterns overlap with current database
- Set<PartialPath> filteredPathPatternSet = new HashSet<>();
- try {
- PartialPath storageGroupPath = new PartialPath(storageGroup);
- filteredPathPatternSet.addAll(
- patternTree.getOverlappedPathPatterns(storageGroupPath));
- filteredPathPatternSet.addAll(
- patternTree.getOverlappedPathPatterns(
- storageGroupPath.concatNode(MULTI_LEVEL_PATH_WILDCARD)));
- } catch (IllegalPathException ignored) {
- // won't reach here
- }
- List<PartialPath> filteredPathPatternList = new
ArrayList<>(filteredPathPatternSet);
-
+ List<PartialPath> filteredPathPatternList =
+ filterPathPattern(patternTree, storageGroup);
schemaRegionSet.forEach(
region -> {
- SchemaQueryScanNode schemaQueryScanNode =
(SchemaQueryScanNode) seed.clone();
- schemaQueryScanNode.setPlanNodeId(
- context.queryContext.getQueryId().genPlanNodeId());
- schemaQueryScanNode.setRegionReplicaSet(region);
-
schemaQueryScanNode.setPathPatternList(filteredPathPatternList);
- root.addChild(schemaQueryScanNode);
+ addSchemaSourceNode(
+ root,
+ filteredPathPatternList.size() == 1
+ ? filteredPathPatternList.get(0)
+ : seed.getPath(),
+ region,
+ context.queryContext.getQueryId().genPlanNodeId(),
+ seed);
});
});
+ if (!regionsOfSystemDatabase.isEmpty()) {
+ List<PartialPath> filteredPathPatternList =
+ filterPathPattern(patternTree, IoTDBMetricsUtils.DATABASE);
+ regionsOfSystemDatabase.forEach(
+ region -> {
+ addSchemaSourceNode(
+ root,
+ filteredPathPatternList.size() == 1
+ ? filteredPathPatternList.get(0)
+ : seed.getPath(),
+ region,
+ context.queryContext.getQueryId().genPlanNodeId(),
+ seed);
+ });
+ }
}
return Collections.singletonList(root);
}
+ private List<PartialPath> filterPathPattern(PathPatternTree patternTree,
String database) {
+ // extract the patterns overlap with current database
+ Set<PartialPath> filteredPathPatternSet = new HashSet<>();
+ try {
+ PartialPath storageGroupPath = new PartialPath(database);
+
filteredPathPatternSet.addAll(patternTree.getOverlappedPathPatterns(storageGroupPath));
+ filteredPathPatternSet.addAll(
+ patternTree.getOverlappedPathPatterns(
+ storageGroupPath.concatNode(MULTI_LEVEL_PATH_WILDCARD)));
+ } catch (IllegalPathException ignored) {
+ // won't reach here
+ }
+ return new ArrayList<>(filteredPathPatternSet);
+ }
+
+ private void addSchemaSourceNode(
+ SchemaQueryMergeNode root,
+ PartialPath pathPattern,
+ TRegionReplicaSet schemaRegion,
+ PlanNodeId planNodeId,
+ SchemaQueryScanNode seed) {
+ SchemaQueryScanNode schemaQueryScanNode = (SchemaQueryScanNode)
seed.clone();
+ schemaQueryScanNode.setPlanNodeId(planNodeId);
+ schemaQueryScanNode.setRegionReplicaSet(schemaRegion);
+ schemaQueryScanNode.setPath(pathPattern);
+ root.addChild(schemaQueryScanNode);
+ }
+
@Override
public List<PlanNode> visitCountMerge(
CountSchemaMergeNode node, DistributionPlanContext context) {