This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new be6e889312a [IOTDB-6325] Support RegionScan for active metadata
queries [FE Part]
be6e889312a is described below
commit be6e889312a52409b74b0d25367db7f47a5242ab
Author: YangCaiyin <[email protected]>
AuthorDate: Mon May 13 12:19:54 2024 +0800
[IOTDB-6325] Support RegionScan for active metadata queries [FE Part]
---
.../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 12 +-
.../queryengine/common/TimeseriesSchemaInfo.java | 123 ++++++++
.../common/schematree/ClusterSchemaTree.java | 26 ++
.../common/schematree/DeviceSchemaInfo.java | 4 +
.../queryengine/common/schematree/ISchemaTree.java | 2 +
.../common/schematree/node/SchemaInternalNode.java | 5 +
.../schematree/node/SchemaMeasurementNode.java | 6 +
.../common/schematree/node/SchemaNode.java | 2 +
.../schema/source/TimeSeriesSchemaSource.java | 2 +-
.../db/queryengine/plan/analyze/Analysis.java | 29 ++
.../queryengine/plan/analyze/AnalyzeVisitor.java | 194 +++++++++++-
.../plan/analyze/schema/ISchemaFetcher.java | 16 +
.../db/queryengine/plan/parser/ASTVisitor.java | 41 ++-
.../plan/planner/LogicalPlanBuilder.java | 21 ++
.../plan/planner/LogicalPlanVisitor.java | 37 ++-
.../distribution/DistributionPlanContext.java | 4 +
.../planner/distribution/ExchangeNodeAdder.java | 12 +
.../plan/planner/distribution/SourceRewriter.java | 65 ++++
.../plan/planner/plan/node/PlanNodeType.java | 14 +-
.../plan/planner/plan/node/PlanVisitor.java | 20 ++
.../planner/plan/node/process/RegionMergeNode.java | 128 ++++++++
.../plan/node/source/DeviceRegionScanNode.java | 175 +++++++++++
.../planner/plan/node/source/RegionScanNode.java | 94 ++++++
.../plan/node/source/TimeseriesRegionScanNode.java | 336 +++++++++++++++++++++
.../statement/metadata/CountDevicesStatement.java | 16 +
.../metadata/CountTimeSeriesStatement.java | 15 +
.../statement/metadata/ShowDevicesStatement.java | 14 +
.../metadata/ShowTimeSeriesStatement.java | 16 +-
.../distribution/RegionScanPlanningTest.java | 149 +++++++++
.../logical/RegionScanLogicalPlannerTest.java | 206 +++++++++++++
.../org/apache/iotdb/commons/path/AlignedPath.java | 6 +
.../apache/iotdb/commons/path/MeasurementPath.java | 4 +
32 files changed, 1764 insertions(+), 30 deletions(-)
diff --git
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index ec798d6a624..c3620a84317 100644
---
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -178,14 +178,18 @@ aliasClause
: ALIAS operator_eq alias
;
+timeConditionClause
+ :whereClause
+ ;
+
// ---- Show Devices
showDevices
- : SHOW DEVICES prefixPath? (WITH (STORAGE GROUP | DATABASE))?
devicesWhereClause? rowPaginationClause?
+ : SHOW DEVICES prefixPath? (WITH (STORAGE GROUP | DATABASE))?
devicesWhereClause? timeConditionClause? rowPaginationClause?
;
// ---- Show Timeseries
showTimeseries
- : SHOW LATEST? TIMESERIES prefixPath? timeseriesWhereClause?
rowPaginationClause?
+ : SHOW LATEST? TIMESERIES prefixPath? timeseriesWhereClause?
timeConditionClause? rowPaginationClause?
;
// ---- Show Child Paths
@@ -200,12 +204,12 @@ showChildNodes
// ---- Count Devices
countDevices
- : COUNT DEVICES prefixPath?
+ : COUNT DEVICES prefixPath? timeConditionClause?
;
// ---- Count Timeseries
countTimeseries
- : COUNT TIMESERIES prefixPath? timeseriesWhereClause? (GROUP BY LEVEL
operator_eq INTEGER_LITERAL)?
+ : COUNT TIMESERIES prefixPath? timeseriesWhereClause? timeConditionClause?
(GROUP BY LEVEL operator_eq INTEGER_LITERAL)?
;
// ---- Count Nodes
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/TimeseriesSchemaInfo.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/TimeseriesSchemaInfo.java
new file mode 100644
index 00000000000..4992c8abeef
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/TimeseriesSchemaInfo.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.common;
+
+import
org.apache.iotdb.db.queryengine.common.schematree.IMeasurementSchemaInfo;
+import org.apache.iotdb.db.schemaengine.schemaregion.utils.MetaUtils;
+
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+import static
org.apache.iotdb.db.queryengine.execution.operator.schema.source.TimeSeriesSchemaSource.mapToString;
+
+public class TimeseriesSchemaInfo {
+ private final String dataType;
+ private final String encoding;
+ private final String compression;
+ private final String tags;
+
+ // TODO: Currently we can't get attributes from fetchSchema in query
+ // private final String attributes;
+
+ private final String deadband;
+ private final String deadbandParameters;
+
+ public TimeseriesSchemaInfo(IMeasurementSchemaInfo schemaInfo) {
+ this.dataType = schemaInfo.getSchema().getType().toString();
+ this.encoding = schemaInfo.getSchema().getEncodingType().toString();
+ this.compression = schemaInfo.getSchema().getCompressor().toString();
+ this.tags = mapToString(schemaInfo.getTagMap());
+ Pair<String, String> deadbandInfo =
+ MetaUtils.parseDeadbandInfo(schemaInfo.getSchema().getProps());
+ this.deadband = deadbandInfo.left == null ? "" : deadbandInfo.left;
+ this.deadbandParameters = deadbandInfo.right == null ? "" :
deadbandInfo.right;
+ }
+
+ public TimeseriesSchemaInfo(
+ String dataType,
+ String encoding,
+ String compression,
+ String tags,
+ String deadband,
+ String deadbandParameters) {
+ this.dataType = dataType;
+ this.encoding = encoding;
+ this.compression = compression;
+ this.tags = tags;
+ this.deadband = deadband;
+ this.deadbandParameters = deadbandParameters;
+ }
+
+ public void serializeAttributes(ByteBuffer byteBuffer) {
+ ReadWriteIOUtils.write(dataType, byteBuffer);
+ ReadWriteIOUtils.write(encoding, byteBuffer);
+ ReadWriteIOUtils.write(compression, byteBuffer);
+ ReadWriteIOUtils.write(tags, byteBuffer);
+ ReadWriteIOUtils.write(deadband, byteBuffer);
+ ReadWriteIOUtils.write(deadbandParameters, byteBuffer);
+ }
+
+ public void serializeAttributes(DataOutputStream stream) throws IOException {
+ ReadWriteIOUtils.write(dataType, stream);
+ ReadWriteIOUtils.write(encoding, stream);
+ ReadWriteIOUtils.write(compression, stream);
+ ReadWriteIOUtils.write(tags, stream);
+ ReadWriteIOUtils.write(deadband, stream);
+ ReadWriteIOUtils.write(deadbandParameters, stream);
+ }
+
+ public static TimeseriesSchemaInfo deserialize(ByteBuffer buffer) {
+ String dataType = ReadWriteIOUtils.readString(buffer);
+ String encoding = ReadWriteIOUtils.readString(buffer);
+ String compression = ReadWriteIOUtils.readString(buffer);
+ String tags = ReadWriteIOUtils.readString(buffer);
+ String deadband = ReadWriteIOUtils.readString(buffer);
+ String deadbandParameters = ReadWriteIOUtils.readString(buffer);
+ return new TimeseriesSchemaInfo(
+ dataType, encoding, compression, tags, deadband, deadbandParameters);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ TimeseriesSchemaInfo that = (TimeseriesSchemaInfo) obj;
+ return dataType.equals(that.dataType)
+ && encoding.equals(that.encoding)
+ && compression.equals(that.compression)
+ && tags.equals(that.tags)
+ && deadband.equals(that.deadband)
+ && deadbandParameters.equals(that.deadbandParameters);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(dataType, encoding, compression, tags, deadband,
deadbandParameters);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ClusterSchemaTree.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ClusterSchemaTree.java
index 1ffa3827fd0..995c4542c82 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ClusterSchemaTree.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ClusterSchemaTree.java
@@ -402,6 +402,32 @@ public class ClusterSchemaTree implements ISchemaTree {
this.hasNormalTimeSeries |= schemaTree.hasNormalTimeSeries;
}
+ @Override
+ public void removeLogicalView() {
+ removeLogicViewMeasurement(root);
+ }
+
+ private void removeLogicViewMeasurement(SchemaNode parent) {
+ if (parent.isMeasurement()) {
+ return;
+ }
+
+ Map<String, SchemaNode> children = parent.getChildren();
+ List<String> childrenToBeRemoved = new ArrayList<>();
+ for (Map.Entry<String, SchemaNode> entry : children.entrySet()) {
+ SchemaNode child = entry.getValue();
+ if (child.isMeasurement() &&
child.getAsMeasurementNode().isLogicalView()) {
+ childrenToBeRemoved.add(entry.getKey());
+ } else {
+ removeLogicViewMeasurement(child);
+ }
+ }
+
+ for (String key : childrenToBeRemoved) {
+ parent.removeChild(key);
+ }
+ }
+
private void traverseAndMerge(SchemaNode thisNode, SchemaNode thisParent,
SchemaNode thatNode) {
SchemaNode thisChild;
SchemaEntityNode entityNode;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/DeviceSchemaInfo.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/DeviceSchemaInfo.java
index 35c0e2fbbc9..8bb8a8f4ab0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/DeviceSchemaInfo.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/DeviceSchemaInfo.java
@@ -74,6 +74,10 @@ public class DeviceSchemaInfo {
.collect(Collectors.toList());
}
+ public List<IMeasurementSchemaInfo> getMeasurementSchemaInfoList() {
+ return measurementSchemaInfoList;
+ }
+
public List<MeasurementPath> getMeasurementSchemaPathList() {
return measurementSchemaInfoList.stream()
.map(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ISchemaTree.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ISchemaTree.java
index 72a646a3526..0288033b695 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ISchemaTree.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ISchemaTree.java
@@ -107,4 +107,6 @@ public interface ISchemaTree {
* @return whether there's view in this schema tree
*/
boolean hasLogicalViewMeasurement();
+
+ void removeLogicalView();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaInternalNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaInternalNode.java
index 5dc3a2d8f20..5c6de241baf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaInternalNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaInternalNode.java
@@ -53,6 +53,11 @@ public class SchemaInternalNode extends SchemaNode {
children.replace(name, newChild);
}
+ @Override
+ public void removeChild(String name) {
+ children.remove(name);
+ }
+
@Override
public void copyDataTo(SchemaNode schemaNode) {
if (schemaNode.isMeasurement()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaMeasurementNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaMeasurementNode.java
index dcf7f674771..55657a59700 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaMeasurementNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaMeasurementNode.java
@@ -93,6 +93,12 @@ public class SchemaMeasurementNode extends SchemaNode
implements IMeasurementSch
"This operation is not supported in SchemaMeasurementNode.");
}
+ @Override
+ public void removeChild(String name) {
+ throw new UnsupportedOperationException(
+ "Remove child operation is not supported in SchemaMeasurementNode.");
+ }
+
@Override
public void copyDataTo(SchemaNode schemaNode) {
if (!schemaNode.isMeasurement()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaNode.java
index dba5b09fc5d..e2625cd97ac 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaNode.java
@@ -49,6 +49,8 @@ public abstract class SchemaNode implements ITreeNode {
public abstract void replaceChild(String name, SchemaNode newChild);
+ public abstract void removeChild(String name);
+
public abstract void copyDataTo(SchemaNode schemaNode);
public Map<String, SchemaNode> getChildren() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/source/TimeSeriesSchemaSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/source/TimeSeriesSchemaSource.java
index 09a4c9908ca..47b73ccb0e5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/source/TimeSeriesSchemaSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/source/TimeSeriesSchemaSource.java
@@ -136,7 +136,7 @@ public class TimeSeriesSchemaSource implements
ISchemaSource<ITimeSeriesSchemaIn
return schemaRegion.getSchemaRegionStatistics().getSeriesNumber(true);
}
- private String mapToString(Map<String, String> map) {
+ public static String mapToString(Map<String, String> map) {
if (map == null || map.isEmpty()) {
return null;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
index d7e2163eca1..be5c98525bb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.NodeRef;
+import org.apache.iotdb.db.queryengine.common.TimeseriesSchemaInfo;
import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
import
org.apache.iotdb.db.queryengine.plan.execution.memory.StatementMemorySource;
@@ -299,6 +300,28 @@ public class Analysis implements IAnalysis {
private List<String> measurementList;
private List<IMeasurementSchema> measurementSchemaList;
+ // Used for regionScan
+ private Map<PartialPath, Boolean> devicePathToAlignedStatus;
+ private Map<PartialPath, Map<PartialPath, List<TimeseriesSchemaInfo>>>
deviceToTimeseriesSchemas;
+
+ public void setDevicePathToAlignedStatus(Map<PartialPath, Boolean>
devicePathToAlignedStatus) {
+ this.devicePathToAlignedStatus = devicePathToAlignedStatus;
+ }
+
+ public Map<PartialPath, Boolean> getDevicePathToAlignedStatus() {
+ return devicePathToAlignedStatus;
+ }
+
+ public void setDeviceToTimeseriesSchemas(
+ Map<PartialPath, Map<PartialPath, List<TimeseriesSchemaInfo>>>
deviceToTimeseriesSchemas) {
+ this.deviceToTimeseriesSchemas = deviceToTimeseriesSchemas;
+ }
+
+ public Map<PartialPath, Map<PartialPath, List<TimeseriesSchemaInfo>>>
+ getDeviceToTimeseriesSchemas() {
+ return deviceToTimeseriesSchemas;
+ }
+
/////////////////////////////////////////////////////////////////////////////////////////////////
// Used in optimizer
/////////////////////////////////////////////////////////////////////////////////////////////////
@@ -313,6 +336,12 @@ public class Analysis implements IAnalysis {
return
dataPartition.getDataRegionReplicaSetWithTimeFilter(seriesPath.getDevice(),
timefilter);
}
+ public List<TRegionReplicaSet> getPartitionInfoByDevice(
+ PartialPath devicePath, Filter timefilter) {
+ return dataPartition.getDataRegionReplicaSetWithTimeFilter(
+ devicePath.getFullPath(), timefilter);
+ }
+
public TRegionReplicaSet getPartitionInfo(
PartialPath seriesPath, TTimePartitionSlot tTimePartitionSlot) {
return dataPartition.getDataRegionReplicaSet(seriesPath.getDevice(),
tTimePartitionSlot).get(0);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index ec84b871532..a7448a93529 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
import org.apache.iotdb.commons.partition.SchemaPartition;
+import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
@@ -48,11 +49,13 @@ import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.TimeseriesSchemaInfo;
import org.apache.iotdb.db.queryengine.common.header.ColumnHeader;
import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
import org.apache.iotdb.db.queryengine.common.header.DatasetHeaderFactory;
import org.apache.iotdb.db.queryengine.common.schematree.DeviceSchemaInfo;
+import
org.apache.iotdb.db.queryengine.common.schematree.IMeasurementSchemaInfo;
import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
import org.apache.iotdb.db.queryengine.execution.operator.window.WindowType;
import org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet;
@@ -2788,6 +2791,63 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
return analysis;
}
+ private boolean analyzeTimeseriesRegionScan(
+ WhereCondition timeCondition,
+ PathPatternTree patternTree,
+ Analysis analysis,
+ MPPQueryContext context)
+ throws IllegalPathException {
+ analyzeGlobalTimeConditionInShowMetaData(timeCondition, analysis);
+ context.generateGlobalTimeFilter(analysis);
+
+ ISchemaTree schemaTree = schemaFetcher.fetchSchemaWithTags(patternTree,
false, context);
+ if (schemaTree.isEmpty()) {
+ analysis.setFinishQueryAfterAnalyze(true);
+ return false;
+ }
+ removeLogicViewMeasurement(schemaTree);
+
+ Map<PartialPath, Map<PartialPath, List<TimeseriesSchemaInfo>>>
deviceToTimeseriesSchemaInfo =
+ new HashMap<>();
+ List<DeviceSchemaInfo> deviceSchemaInfoList =
schemaTree.getMatchedDevices(ALL_MATCH_PATTERN);
+ Set<String> deviceSet = new HashSet<>();
+ for (DeviceSchemaInfo deviceSchemaInfo : deviceSchemaInfoList) {
+ boolean isAligned = deviceSchemaInfo.isAligned();
+ PartialPath devicePath = deviceSchemaInfo.getDevicePath();
+ deviceSet.add(devicePath.getFullPath());
+ if (isAligned) {
+ List<String> measurementList = new ArrayList<>();
+ List<TimeseriesSchemaInfo> timeseriesSchemaInfoList = new
ArrayList<>();
+ for (IMeasurementSchemaInfo measurementSchemaInfo :
+ deviceSchemaInfo.getMeasurementSchemaInfoList()) {
+ measurementList.add(measurementSchemaInfo.getName());
+ timeseriesSchemaInfoList.add(new
TimeseriesSchemaInfo(measurementSchemaInfo));
+ }
+ AlignedPath alignedPath = new AlignedPath(devicePath.getNodes(),
measurementList);
+ deviceToTimeseriesSchemaInfo
+ .computeIfAbsent(devicePath, k -> new HashMap<>())
+ .put(alignedPath, timeseriesSchemaInfoList);
+ } else {
+ for (IMeasurementSchemaInfo measurementSchemaInfo :
+ deviceSchemaInfo.getMeasurementSchemaInfoList()) {
+ MeasurementPath measurementPath =
+ new MeasurementPath(
+
devicePath.concatNode(measurementSchemaInfo.getName()).getNodes());
+ deviceToTimeseriesSchemaInfo
+ .computeIfAbsent(devicePath, k -> new HashMap<>())
+ .put(
+ measurementPath,
+ Collections.singletonList(new
TimeseriesSchemaInfo(measurementSchemaInfo)));
+ }
+ }
+ }
+ analysis.setDeviceToTimeseriesSchemas(deviceToTimeseriesSchemaInfo);
+ // fetch Data partition
+ DataPartition dataPartition = fetchDataPartitionByDevices(deviceSet,
schemaTree, context);
+ analysis.setDataPartitionInfo(dataPartition);
+ return true;
+ }
+
@Override
public Analysis visitShowTimeSeries(
ShowTimeSeriesStatement showTimeSeriesStatement, MPPQueryContext
context) {
@@ -2796,12 +2856,28 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
PathPatternTree patternTree = new PathPatternTree();
patternTree.appendPathPattern(showTimeSeriesStatement.getPathPattern());
- SchemaPartition schemaPartitionInfo =
partitionFetcher.getSchemaPartition(patternTree);
- analysis.setSchemaPartitionInfo(schemaPartitionInfo);
- Map<Integer, Template> templateMap =
-
schemaFetcher.checkAllRelatedTemplate(showTimeSeriesStatement.getPathPattern());
- analysis.setRelatedTemplateInfo(templateMap);
+ if (showTimeSeriesStatement.hasTimeCondition()) {
+ try {
+ // If there is time condition in SHOW TIMESERIES, we need to scan the
raw data
+ boolean hasSchema =
+ analyzeTimeseriesRegionScan(
+ showTimeSeriesStatement.getTimeCondition(), patternTree,
analysis, context);
+ if (!hasSchema) {
+ return analysis;
+ }
+ } catch (IllegalPathException e) {
+ throw new StatementAnalyzeException(e.getMessage());
+ }
+
+ } else {
+ SchemaPartition schemaPartitionInfo =
partitionFetcher.getSchemaPartition(patternTree);
+ analysis.setSchemaPartitionInfo(schemaPartitionInfo);
+
+ Map<Integer, Template> templateMap =
+
schemaFetcher.checkAllRelatedTemplate(showTimeSeriesStatement.getPathPattern());
+ analysis.setRelatedTemplateInfo(templateMap);
+ }
if (showTimeSeriesStatement.isOrderByHeat()) {
patternTree.constructTree();
@@ -2841,6 +2917,64 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
return analysis;
}
+ private void analyzeGlobalTimeConditionInShowMetaData(
+ WhereCondition timeCondition, Analysis analysis) {
+ Expression predicate = timeCondition.getPredicate();
+ Pair<Expression, Boolean> resultPair =
+ PredicateUtils.extractGlobalTimePredicate(predicate, true, true);
+ if (resultPair.right) {
+ throw new SemanticException(
+ "Value Filter can't exist in the condition of SHOW/COUNT clause,
only time condition supported");
+ }
+ if (resultPair.left == null) {
+ throw new SemanticException(
+ "Time condition can't be empty in the condition of SHOW/COUNT
clause");
+ }
+ Expression globalTimePredicate = resultPair.left;
+ globalTimePredicate =
PredicateUtils.predicateRemoveNot(globalTimePredicate);
+ analysis.setGlobalTimePredicate(globalTimePredicate);
+ }
+
+ private void removeLogicViewMeasurement(ISchemaTree schemaTree) {
+ if (!schemaTree.hasLogicalViewMeasurement()) {
+ return;
+ }
+ schemaTree.removeLogicalView();
+ }
+
+ private boolean analyzeDeviceRegionScan(
+ WhereCondition timeCondition,
+ PathPatternTree patternTree,
+ Analysis analysis,
+ MPPQueryContext context) {
+ // If there is time condition in SHOW DEVICES, we need to scan the raw data
+ analyzeGlobalTimeConditionInShowMetaData(timeCondition, analysis);
+ context.generateGlobalTimeFilter(analysis);
+
+ ISchemaTree schemaTree =
schemaFetcher.fetchSchemaInDeviceLevel(patternTree, context);
+ if (schemaTree.isEmpty()) {
+ analysis.setFinishQueryAfterAnalyze(true);
+ return false;
+ }
+
+ // fetch Data partition
+ List<DeviceSchemaInfo> deviceSchemaInfoList =
schemaTree.getMatchedDevices(ALL_MATCH_PATTERN);
+ Map<PartialPath, Boolean> devicePathsToAlignedStatus = new HashMap<>();
+ for (DeviceSchemaInfo deviceSchema : deviceSchemaInfoList) {
+ devicePathsToAlignedStatus.put(deviceSchema.getDevicePath(),
deviceSchema.isAligned());
+ }
+ analysis.setDevicePathToAlignedStatus(devicePathsToAlignedStatus);
+ DataPartition dataPartition =
+ fetchDataPartitionByDevices(
+ devicePathsToAlignedStatus.keySet().stream()
+ .map(PartialPath::getDevice)
+ .collect(Collectors.toSet()),
+ schemaTree,
+ context);
+ analysis.setDataPartitionInfo(dataPartition);
+ return true;
+ }
+
@Override
public Analysis visitShowDevices(
ShowDevicesStatement showDevicesStatement, MPPQueryContext context) {
@@ -2850,9 +2984,18 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
PathPatternTree patternTree = new PathPatternTree();
patternTree.appendPathPattern(
showDevicesStatement.getPathPattern().concatNode(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD));
- SchemaPartition schemaPartitionInfo =
partitionFetcher.getSchemaPartition(patternTree);
- analysis.setSchemaPartitionInfo(schemaPartitionInfo);
+ if (showDevicesStatement.hasTimeCondition()) {
+ boolean hasSchema =
+ analyzeDeviceRegionScan(
+ showDevicesStatement.getTimeCondition(), patternTree, analysis,
context);
+ if (!hasSchema) {
+ return analysis;
+ }
+ } else {
+ SchemaPartition schemaPartitionInfo =
partitionFetcher.getSchemaPartition(patternTree);
+ analysis.setSchemaPartitionInfo(schemaPartitionInfo);
+ }
analysis.setRespDatasetHeader(
showDevicesStatement.hasSgCol()
? DatasetHeaderFactory.getShowDevicesWithSgHeader()
@@ -2908,9 +3051,18 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
PathPatternTree patternTree = new PathPatternTree();
patternTree.appendPathPattern(
countDevicesStatement.getPathPattern().concatNode(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD));
- SchemaPartition schemaPartitionInfo =
partitionFetcher.getSchemaPartition(patternTree);
+ if (countDevicesStatement.hasTimeCondition()) {
+ boolean hasSchema =
+ analyzeDeviceRegionScan(
+ countDevicesStatement.getTimeCondition(), patternTree, analysis,
context);
+ if (!hasSchema) {
+ return analysis;
+ }
+ } else {
+ SchemaPartition schemaPartitionInfo =
partitionFetcher.getSchemaPartition(patternTree);
+ analysis.setSchemaPartitionInfo(schemaPartitionInfo);
+ }
- analysis.setSchemaPartitionInfo(schemaPartitionInfo);
analysis.setRespDatasetHeader(DatasetHeaderFactory.getCountDevicesHeader());
return analysis;
}
@@ -2923,13 +3075,25 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
PathPatternTree patternTree = new PathPatternTree();
patternTree.appendPathPattern(countTimeSeriesStatement.getPathPattern());
- SchemaPartition schemaPartitionInfo =
partitionFetcher.getSchemaPartition(patternTree);
- analysis.setSchemaPartitionInfo(schemaPartitionInfo);
-
- Map<Integer, Template> templateMap =
-
schemaFetcher.checkAllRelatedTemplate(countTimeSeriesStatement.getPathPattern());
- analysis.setRelatedTemplateInfo(templateMap);
+ if (countTimeSeriesStatement.hasTimeCondition()) {
+ try {
+ boolean hasSchema =
+ analyzeTimeseriesRegionScan(
+ countTimeSeriesStatement.getTimeCondition(), patternTree,
analysis, context);
+ if (!hasSchema) {
+ return analysis;
+ }
+ } catch (IllegalPathException e) {
+ throw new StatementAnalyzeException(e.getMessage());
+ }
+ } else {
+ SchemaPartition schemaPartitionInfo =
partitionFetcher.getSchemaPartition(patternTree);
+ analysis.setSchemaPartitionInfo(schemaPartitionInfo);
+ Map<Integer, Template> templateMap =
+
schemaFetcher.checkAllRelatedTemplate(countTimeSeriesStatement.getPathPattern());
+ analysis.setRelatedTemplateInfo(templateMap);
+ }
analysis.setRespDatasetHeader(DatasetHeaderFactory.getCountTimeSeriesHeader());
return analysis;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ISchemaFetcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ISchemaFetcher.java
index 84c6e29d7c2..a51f8af0ab1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ISchemaFetcher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ISchemaFetcher.java
@@ -47,6 +47,22 @@ public interface ISchemaFetcher {
ISchemaTree fetchSchema(
PathPatternTree patternTree, boolean withTemplate, MPPQueryContext
context);
+ /**
+ * TODO need to be implemented in schema engine
+ *
+ * <p>Fetch all the schema by the given patternTree in device level
+ *
+ * @return schemaTree without measurement nodes
+ */
+ default ISchemaTree fetchSchemaInDeviceLevel(
+ PathPatternTree patternTree, MPPQueryContext context) {
+ ISchemaTree schemaTree = fetchSchema(patternTree, false, context);
+ if (schemaTree.hasLogicalViewMeasurement()) {
+ schemaTree.removeLogicalView();
+ }
+ return schemaTree;
+ }
+
/**
* Fetch all the schema with tags of existing timeseries matched by the
given patternTree
*
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
index 39f1004a3b8..3d3efc1b8c3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
@@ -620,9 +620,17 @@ public class ASTVisitor extends
IoTDBSqlParserBaseVisitor<Statement> {
new PartialPath(SqlConstant.getSingleRootArray()), orderByHeat);
}
if (ctx.timeseriesWhereClause() != null) {
+ if (ctx.timeConditionClause() != null) {
+ throw new SemanticException(
+ "TIMESERIES condition and TIME condition cannot be used at the
same time.");
+ }
SchemaFilter schemaFilter =
parseTimeseriesWhereClause(ctx.timeseriesWhereClause());
showTimeSeriesStatement.setSchemaFilter(schemaFilter);
}
+ if (ctx.timeConditionClause() != null) {
+ showTimeSeriesStatement.setTimeCondition(
+ parseWhereClause(ctx.timeConditionClause().whereClause()));
+ }
if (ctx.rowPaginationClause() != null) {
if (ctx.rowPaginationClause().limitClause() != null) {
showTimeSeriesStatement.setLimit(parseLimitClause(ctx.rowPaginationClause().limitClause()));
@@ -706,9 +714,16 @@ public class ASTVisitor extends
IoTDBSqlParserBaseVisitor<Statement> {
new ShowDevicesStatement(new
PartialPath(SqlConstant.getSingleRootArray()));
}
if (ctx.devicesWhereClause() != null) {
+ if (ctx.timeConditionClause() != null) {
+ throw new SemanticException(
+ "DEVICE condition and TIME condition cannot be used at the same
time.");
+ }
showDevicesStatement.setSchemaFilter(parseDevicesWhereClause(ctx.devicesWhereClause()));
}
-
+ if (ctx.timeConditionClause() != null) {
+ showDevicesStatement.setTimeCondition(
+ parseWhereClause(ctx.timeConditionClause().whereClause()));
+ }
if (ctx.rowPaginationClause() != null) {
if (ctx.rowPaginationClause().limitClause() != null) {
showDevicesStatement.setLimit(parseLimitClause(ctx.rowPaginationClause().limitClause()));
@@ -756,26 +771,44 @@ public class ASTVisitor extends
IoTDBSqlParserBaseVisitor<Statement> {
} else {
path = new PartialPath(SqlConstant.getSingleRootArray());
}
- return new CountDevicesStatement(path);
+ CountDevicesStatement statement = new CountDevicesStatement(path);
+ if (ctx.timeConditionClause() != null) {
+ WhereCondition timeCondition =
parseWhereClause(ctx.timeConditionClause().whereClause());
+ statement.setTimeCondition(timeCondition);
+ }
+ return statement;
}
// Count TimeSeries
========================================================================
@Override
public Statement visitCountTimeseries(CountTimeseriesContext ctx) {
- Statement statement;
+ Statement statement = null;
PartialPath path;
if (ctx.prefixPath() != null) {
path = parsePrefixPath(ctx.prefixPath());
} else {
path = new PartialPath(SqlConstant.getSingleRootArray());
}
+ if (ctx.timeConditionClause() != null) {
+ statement = new CountTimeSeriesStatement(path);
+ WhereCondition timeCondition =
parseWhereClause(ctx.timeConditionClause().whereClause());
+ ((CountTimeSeriesStatement) statement).setTimeCondition(timeCondition);
+ }
if (ctx.INTEGER_LITERAL() != null) {
+ if (ctx.timeConditionClause() != null) {
+ throw new SemanticException(
+ "TIME condition and GROUP BY LEVEL cannot be used at the same
time.");
+ }
int level = Integer.parseInt(ctx.INTEGER_LITERAL().getText());
statement = new CountLevelTimeSeriesStatement(path, level);
- } else {
+ } else if (statement == null) {
statement = new CountTimeSeriesStatement(path);
}
if (ctx.timeseriesWhereClause() != null) {
+ if (ctx.timeConditionClause() != null) {
+ throw new SemanticException(
+ "TIMESERIES condition and TIME condition cannot be used at the
same time.");
+ }
SchemaFilter schemaFilter =
parseTimeseriesWhereClause(ctx.timeseriesWhereClause());
if (statement instanceof CountTimeSeriesStatement) {
((CountTimeSeriesStatement) statement).setSchemaFilter(schemaFilter);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
index d457c432cf0..b1786c85112 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.schema.filter.SchemaFilter;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.TimeseriesSchemaInfo;
import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
import
org.apache.iotdb.db.queryengine.execution.aggregation.AccumulatorFactory;
import org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil;
@@ -78,9 +79,11 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQ
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.DeviceRegionScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowQueriesNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.TimeseriesRegionScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
@@ -1335,4 +1338,22 @@ public class LogicalPlanBuilder {
new SlidingTimeColumnGeneratorParameter(groupByTimeParameter,
ascending));
return this;
}
+
+ public LogicalPlanBuilder planDeviceRegionScan(
+ Map<PartialPath, Boolean> devicePathToAlignedStatus, boolean
outputCount) {
+ this.root =
+ new DeviceRegionScanNode(
+ context.getQueryId().genPlanNodeId(), devicePathToAlignedStatus,
outputCount, null);
+ return this;
+ }
+
+ public LogicalPlanBuilder planTimeseriesRegionScan(
+ Map<PartialPath, Map<PartialPath, List<TimeseriesSchemaInfo>>>
deviceToTimeseriesSchemaInfo,
+ boolean outputCount) {
+ TimeseriesRegionScanNode timeseriesRegionScanNode =
+ new TimeseriesRegionScanNode(context.getQueryId().genPlanNodeId(),
outputCount, null);
+
timeseriesRegionScanNode.setDeviceToTimeseriesSchemaInfo(deviceToTimeseriesSchemaInfo);
+ this.root = timeseriesRegionScanNode;
+ return this;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
index 664924b2004..08990c5df47 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
@@ -513,6 +513,17 @@ public class LogicalPlanVisitor extends
StatementVisitor<PlanNode, MPPQueryConte
ShowTimeSeriesStatement showTimeSeriesStatement, MPPQueryContext
context) {
LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context);
+ long limit = showTimeSeriesStatement.getLimit();
+ long offset = showTimeSeriesStatement.getOffset();
+ if (showTimeSeriesStatement.hasTimeCondition()) {
+ planBuilder =
+ planBuilder
+
.planTimeseriesRegionScan(analysis.getDeviceToTimeseriesSchemas(), false)
+ .planLimit(limit)
+ .planOffset(offset);
+ return planBuilder.getRoot();
+ }
+
// If there is only one region, we can push down the offset and limit
operation to
// source operator.
boolean canPushDownOffsetLimit =
@@ -520,8 +531,6 @@ public class LogicalPlanVisitor extends
StatementVisitor<PlanNode, MPPQueryConte
&& analysis.getSchemaPartitionInfo().getDistributionInfo().size()
== 1
&& !showTimeSeriesStatement.isOrderByHeat();
- long limit = showTimeSeriesStatement.getLimit();
- long offset = showTimeSeriesStatement.getOffset();
if (showTimeSeriesStatement.isOrderByHeat()) {
limit = 0;
offset = 0;
@@ -545,7 +554,7 @@ public class LogicalPlanVisitor extends
StatementVisitor<PlanNode, MPPQueryConte
// show latest timeseries
if (showTimeSeriesStatement.isOrderByHeat()
&& null != analysis.getDataPartitionInfo()
- && 0 != analysis.getDataPartitionInfo().getDataPartitionMap().size()) {
+ && !analysis.getDataPartitionInfo().getDataPartitionMap().isEmpty()) {
PlanNode lastPlanNode =
new LogicalPlanBuilder(analysis, context).planLast(analysis,
null).getRoot();
planBuilder = planBuilder.planSchemaQueryOrderByHeat(lastPlanNode);
@@ -566,6 +575,15 @@ public class LogicalPlanVisitor extends
StatementVisitor<PlanNode, MPPQueryConte
ShowDevicesStatement showDevicesStatement, MPPQueryContext context) {
LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context);
+ if (showDevicesStatement.hasTimeCondition()) {
+ planBuilder =
+ planBuilder
+ .planDeviceRegionScan(analysis.getDevicePathToAlignedStatus(),
false)
+ .planLimit(showDevicesStatement.getLimit())
+ .planOffset(showDevicesStatement.getOffset());
+ return planBuilder.getRoot();
+ }
+
// If there is only one region, we can push down the offset and limit
operation to
// source operator.
boolean canPushDownOffsetLimit =
@@ -604,6 +622,12 @@ public class LogicalPlanVisitor extends
StatementVisitor<PlanNode, MPPQueryConte
public PlanNode visitCountDevices(
CountDevicesStatement countDevicesStatement, MPPQueryContext context) {
LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context);
+
+ if (countDevicesStatement.hasTimeCondition()) {
+ planBuilder =
planBuilder.planDeviceRegionScan(analysis.getDevicePathToAlignedStatus(), true);
+ return planBuilder.getRoot();
+ }
+
return planBuilder
.planDevicesCountSource(
countDevicesStatement.getPathPattern(),
@@ -617,6 +641,13 @@ public class LogicalPlanVisitor extends
StatementVisitor<PlanNode, MPPQueryConte
public PlanNode visitCountTimeSeries(
CountTimeSeriesStatement countTimeSeriesStatement, MPPQueryContext
context) {
LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context);
+
+ if (countTimeSeriesStatement.hasTimeCondition()) {
+ planBuilder =
+
planBuilder.planTimeseriesRegionScan(analysis.getDeviceToTimeseriesSchemas(),
true);
+ return planBuilder.getRoot();
+ }
+
return planBuilder
.planTimeSeriesCountSource(
countTimeSeriesStatement.getPathPattern(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java
index dfb342c61ea..6c49e4478b8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java
@@ -86,4 +86,8 @@ public class DistributionPlanContext {
public Filter getPartitionTimeFilter() {
return queryContext.getGlobalTimeFilter();
}
+
+ public boolean isOneSeriesInMultiRegion() {
+ return oneSeriesInMultiRegion;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
index 958e85592ea..9771a610261 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
@@ -47,6 +47,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MergeSortN
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ProjectNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.RawDataAggregationNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.RegionMergeNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode;
@@ -63,6 +64,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLast
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.RegionScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode;
@@ -165,6 +167,16 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
return processNoChildSourceNode(node, context);
}
+ @Override
+ public PlanNode visitRegionMerge(RegionMergeNode node, NodeGroupContext
context) {
+ return processMultiChildNode(node, context);
+ }
+
+ @Override
+ public PlanNode visitRegionScan(RegionScanNode node, NodeGroupContext
context) {
+ return processNoChildSourceNode(node, context);
+ }
+
@Override
public PlanNode visitAlignedSeriesScan(AlignedSeriesScanNode node,
NodeGroupContext context) {
return processNoChildSourceNode(node, context);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
index 4a981d45125..3ee8167c662 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
@@ -48,6 +48,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.LimitNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MergeSortNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.RawDataAggregationNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.RegionMergeNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode;
@@ -60,12 +61,15 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQ
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.DeviceRegionScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.RegionScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationSourceNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesSourceNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.TimeseriesRegionScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
@@ -709,6 +713,37 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
return processRawSeriesScan(node, context, mergeNode);
}
+ private List<PlanNode> processRegionScan(RegionScanNode node,
DistributionPlanContext context) {
+ List<PlanNode> planNodeList = splitRegionScanNodeByRegion(node, context);
+ if (planNodeList.size() == 1) {
+ return planNodeList;
+ }
+
+ boolean outputCountInScanNode = node.isOutputCount() &&
!context.isOneSeriesInMultiRegion();
+ RegionMergeNode regionMergeNode =
+ new RegionMergeNode(
+ context.queryContext.getQueryId().genPlanNodeId(),
+ node.isOutputCount(),
+ !outputCountInScanNode);
+ for (PlanNode planNode : planNodeList) {
+ ((RegionScanNode) planNode).setOutputCount(outputCountInScanNode);
+ regionMergeNode.addChild(planNode);
+ }
+ return Collections.singletonList(regionMergeNode);
+ }
+
+ @Override
+ public List<PlanNode> visitDeviceRegionScan(
+ DeviceRegionScanNode node, DistributionPlanContext context) {
+ return processRegionScan(node, context);
+ }
+
+ @Override
+ public List<PlanNode> visitTimeSeriesRegionScan(
+ TimeseriesRegionScanNode node, DistributionPlanContext context) {
+ return processRegionScan(node, context);
+ }
+
private List<PlanNode> processRawSeriesScan(
SeriesSourceNode node, DistributionPlanContext context,
MultiChildProcessNode parent) {
List<PlanNode> sourceNodes = splitSeriesSourceNodeByPartition(node,
context);
@@ -719,6 +754,36 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
return Collections.singletonList(parent);
}
+ private List<PlanNode> splitRegionScanNodeByRegion(
+ RegionScanNode node, DistributionPlanContext context) {
+ Map<TRegionReplicaSet, RegionScanNode> regionScanNodeMap = new HashMap<>();
+ Set<PartialPath> devicesList = node.getDevicePaths();
+ boolean isAllDeviceOnlyInOneRegion = true;
+
+ for (PartialPath device : devicesList) {
+ List<TRegionReplicaSet> dataDistribution =
+ analysis.getPartitionInfoByDevice(device,
context.getPartitionTimeFilter());
+ isAllDeviceOnlyInOneRegion = isAllDeviceOnlyInOneRegion &&
dataDistribution.size() == 1;
+ for (TRegionReplicaSet dataRegion : dataDistribution) {
+ regionScanNodeMap
+ .computeIfAbsent(
+ dataRegion,
+ k -> {
+ RegionScanNode regionScanNode = (RegionScanNode)
node.clone();
+
regionScanNode.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+ regionScanNode.setRegionReplicaSet(dataRegion);
+ regionScanNode.clearPath();
+ return regionScanNode;
+ })
+ .addDevicePath(device, node);
+ }
+ }
+
+ context.setOneSeriesInMultiRegion(!isAllDeviceOnlyInOneRegion);
+ // If there is only one region, return directly
+ return new ArrayList<>(regionScanNodeMap.values());
+ }
+
private List<PlanNode> splitSeriesSourceNodeByPartition(
SeriesSourceNode node, DistributionPlanContext context) {
List<PlanNode> ret = new ArrayList<>();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index 41d912e145c..0fd420ce1bb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -79,6 +79,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MergeSortN
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.OffsetNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ProjectNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.RawDataAggregationNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.RegionMergeNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode;
@@ -96,10 +97,12 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.ShuffleSinkNo
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.DeviceRegionScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowQueriesNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.TimeseriesRegionScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
@@ -213,7 +216,10 @@ public enum PlanNodeType {
PIPE_OPERATE_SCHEMA_QUEUE_REFERENCE((short) 91),
RAW_DATA_AGGREGATION((short) 92),
- ;
+
+ DEVICE_REGION_SCAN((short) 93),
+ TIMESERIES_REGION_SCAN((short) 94),
+ REGION_MERGE((short) 95);
public static final int BYTES = Short.BYTES;
@@ -450,6 +456,12 @@ public enum PlanNodeType {
return PipeOperateSchemaQueueNode.deserialize(buffer);
case 92:
return RawDataAggregationNode.deserialize(buffer);
+ case 93:
+ return DeviceRegionScanNode.deserialize(buffer);
+ case 94:
+ return TimeseriesRegionScanNode.deserialize(buffer);
+ case 95:
+ return RegionMergeNode.deserialize(buffer);
default:
throw new IllegalArgumentException("Invalid node type: " + nodeType);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
index 932a9d22597..197c38c9cab 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
@@ -77,6 +77,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChild
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.OffsetNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ProjectNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.RawDataAggregationNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.RegionMergeNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode;
@@ -96,13 +97,16 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.ShuffleSinkNo
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.DeviceRegionScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.RegionScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationSourceNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanSourceNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowQueriesNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.TimeseriesRegionScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
@@ -161,6 +165,18 @@ public abstract class PlanVisitor<R, C> {
return visitSourceNode(node, context);
}
+ public R visitRegionScan(RegionScanNode node, C context) {
+ return visitSourceNode(node, context);
+ }
+
+ public R visitDeviceRegionScan(DeviceRegionScanNode node, C context) {
+ return visitRegionScan(node, context);
+ }
+
+ public R visitTimeSeriesRegionScan(TimeseriesRegionScanNode node, C context)
{
+ return visitRegionScan(node, context);
+ }
+
// single child
--------------------------------------------------------------------------------
public R visitSingleChildProcess(SingleChildProcessNode node, C context) {
@@ -303,6 +319,10 @@ public abstract class PlanVisitor<R, C> {
return visitMultiChildProcess(node, context);
}
+ public R visitRegionMerge(RegionMergeNode node, C context) {
+ return visitMultiChildProcess(node, context);
+ }
+
// others
-----------------------------------------------------------------------------------
public R visitShowQueries(ShowQueriesNode node, C context) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/RegionMergeNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/RegionMergeNode.java
new file mode 100644
index 00000000000..42f7bd09de9
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/RegionMergeNode.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.plan.node.process;
+
+import org.apache.iotdb.db.queryengine.common.header.ColumnHeader;
+import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public class RegionMergeNode extends MultiChildProcessNode {
+
+ private final boolean outputCount;
+
+ private final boolean needMerge;
+
+ protected RegionMergeNode(
+ PlanNodeId id, List<PlanNode> children, boolean outputCount, boolean
needMerge) {
+ super(id, children);
+ this.outputCount = outputCount;
+ this.needMerge = needMerge;
+ }
+
+ public RegionMergeNode(PlanNodeId id, boolean outputCount, boolean
needMerge) {
+ super(id);
+ this.outputCount = outputCount;
+ this.needMerge = needMerge;
+ }
+
+ @Override
+ public PlanNode clone() {
+ return new RegionMergeNode(this.id, outputCount, needMerge);
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return outputCount
+ ? ColumnHeaderConstant.countDevicesColumnHeaders.stream()
+ .map(ColumnHeader::getColumnName)
+ .collect(Collectors.toList())
+ : ColumnHeaderConstant.showDevicesColumnHeaders.stream()
+ .map(ColumnHeader::getColumnName)
+ .collect(Collectors.toList());
+ }
+
+ public static RegionMergeNode deserialize(ByteBuffer byteBuffer) {
+ boolean outputCount = byteBuffer.get() == 1;
+ boolean needMerge = byteBuffer.get() == 1;
+ PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+ return new RegionMergeNode(planNodeId, outputCount, needMerge);
+ }
+
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.REGION_MERGE.serialize(byteBuffer);
+ byteBuffer.put((byte) (outputCount ? 1 : 0));
+ byteBuffer.put((byte) (needMerge ? 1 : 0));
+ }
+
+ @Override
+ protected void serializeAttributes(DataOutputStream stream) throws
IOException {
+ PlanNodeType.REGION_MERGE.serialize(stream);
+ stream.writeBoolean(outputCount);
+ stream.writeBoolean(needMerge);
+ }
+
+ @Override
+ public String toString() {
+ return "RegionMergeNode{"
+ + "outputCount="
+ + outputCount
+ + ", needMerge="
+ + needMerge
+ + ", id="
+ + id
+ + '}';
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitRegionMerge(this, context);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ RegionMergeNode that = (RegionMergeNode) o;
+ return outputCount == that.outputCount && needMerge == that.needMerge;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), outputCount, needMerge);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/DeviceRegionScanNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/DeviceRegionScanNode.java
new file mode 100644
index 00000000000..1971380f805
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/DeviceRegionScanNode.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.plan.node.source;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.queryengine.common.header.ColumnHeader;
+import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeUtil;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class DeviceRegionScanNode extends RegionScanNode {
+ private Map<PartialPath, Boolean> devicePathsToAligned;
+
+ public DeviceRegionScanNode(
+ PlanNodeId planNodeId,
+ Map<PartialPath, Boolean> devicePathsToAligned,
+ boolean outputCount,
+ TRegionReplicaSet regionReplicaSet) {
+ super(planNodeId);
+ this.devicePathsToAligned = devicePathsToAligned;
+ this.regionReplicaSet = regionReplicaSet;
+ this.outputCount = outputCount;
+ }
+
+ public Map<PartialPath, Boolean> getDevicePathsToAligned() {
+ return devicePathsToAligned;
+ }
+
+ @Override
+ public List<PlanNode> getChildren() {
+ return ImmutableList.of();
+ }
+
+ @Override
+ public void addChild(PlanNode child) {
+ throw new UnsupportedOperationException("DeviceRegionScanNode has no
children");
+ }
+
+ @Override
+ public PlanNode clone() {
+ return new DeviceRegionScanNode(
+ getPlanNodeId(), getDevicePathsToAligned(), isOutputCount(),
getRegionReplicaSet());
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return outputCount
+ ? ColumnHeaderConstant.countDevicesColumnHeaders.stream()
+ .map(ColumnHeader::getColumnName)
+ .collect(Collectors.toList())
+ : ColumnHeaderConstant.showDevicesColumnHeaders.stream()
+ .map(ColumnHeader::getColumnName)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitDeviceRegionScan(this, context);
+ }
+
+ @Override
+ public int allowedChildCount() {
+ return NO_CHILD_ALLOWED;
+ }
+
+ public static PlanNode deserialize(ByteBuffer buffer) {
+ int size = ReadWriteIOUtils.readInt(buffer);
+ Map<PartialPath, Boolean> devicePathsToAligned = new HashMap<>();
+ for (int i = 0; i < size; i++) {
+ PartialPath path = PartialPath.deserialize(buffer);
+ boolean aligned = ReadWriteIOUtils.readBool(buffer);
+ devicePathsToAligned.put(path, aligned);
+ }
+ boolean outputCount = ReadWriteIOUtils.readBool(buffer);
+ PlanNodeId planNodeId = PlanNodeId.deserialize(buffer);
+ return new DeviceRegionScanNode(planNodeId, devicePathsToAligned,
outputCount, null);
+ }
+
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.DEVICE_REGION_SCAN.serialize(byteBuffer);
+ ReadWriteIOUtils.write(devicePathsToAligned.size(), byteBuffer);
+ for (Map.Entry<PartialPath, Boolean> entry :
devicePathsToAligned.entrySet()) {
+ entry.getKey().serialize(byteBuffer);
+ ReadWriteIOUtils.write(entry.getValue(), byteBuffer);
+ }
+ ReadWriteIOUtils.write(outputCount, byteBuffer);
+ }
+
+ @Override
+ protected void serializeAttributes(DataOutputStream stream) throws
IOException {
+ PlanNodeType.DEVICE_REGION_SCAN.serialize(stream);
+ ReadWriteIOUtils.write(devicePathsToAligned.size(), stream);
+ for (Map.Entry<PartialPath, Boolean> entry :
devicePathsToAligned.entrySet()) {
+ entry.getKey().serialize(stream);
+ ReadWriteIOUtils.write(entry.getValue(), stream);
+ }
+ ReadWriteIOUtils.write(outputCount, stream);
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "DeviceRegionScanNode-%s:[DataRegion: %s OutputCount: %s]",
+ this.getPlanNodeId(),
+ PlanNodeUtil.printRegionReplicaSet(getRegionReplicaSet()),
+ outputCount);
+ }
+
+ @Override
+ public Set<PartialPath> getDevicePaths() {
+ return new HashSet<>(devicePathsToAligned.keySet());
+ }
+
+ @Override
+ public void addDevicePath(PartialPath devicePath, RegionScanNode node) {
+ this.devicePathsToAligned.put(
+ devicePath, ((DeviceRegionScanNode)
node).devicePathsToAligned.get(devicePath));
+ }
+
+ @Override
+ public void clearPath() {
+ this.devicePathsToAligned = new HashMap<>();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ if (!super.equals(o)) return false;
+ DeviceRegionScanNode that = (DeviceRegionScanNode) o;
+ return devicePathsToAligned.equals(that.devicePathsToAligned)
+ && outputCount == that.isOutputCount();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), devicePathsToAligned, outputCount);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/RegionScanNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/RegionScanNode.java
new file mode 100644
index 00000000000..866d85a4f03
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/RegionScanNode.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.plan.node.source;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+
+import java.util.Objects;
+import java.util.Set;
+
+public abstract class RegionScanNode extends SourceNode {
+
+ protected TRegionReplicaSet regionReplicaSet;
+ protected boolean outputCount = false;
+
+ protected RegionScanNode(PlanNodeId id) {
+ super(id);
+ }
+
+ @Override
+ public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) {
+ this.regionReplicaSet = regionReplicaSet;
+ }
+
+ @Override
+ public TRegionReplicaSet getRegionReplicaSet() {
+ return regionReplicaSet;
+ }
+
+ public boolean isOutputCount() {
+ return outputCount;
+ }
+
+ public void setOutputCount(boolean outputCount) {
+ this.outputCount = outputCount;
+ }
+
+ public abstract Set<PartialPath> getDevicePaths();
+
+ public abstract void addDevicePath(PartialPath devicePath, RegionScanNode
node);
+
+ public abstract void clearPath();
+
+ @Override
+ public void open() throws Exception {}
+
+ @Override
+ public void close() throws Exception {}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ RegionScanNode that = (RegionScanNode) o;
+ return Objects.equals(regionReplicaSet, that.regionReplicaSet)
+ && Objects.equals(outputCount, that.outputCount);
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitRegionScan(this, context);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), regionReplicaSet, outputCount);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/TimeseriesRegionScanNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/TimeseriesRegionScanNode.java
new file mode 100644
index 00000000000..cbcc3138d58
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/TimeseriesRegionScanNode.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.plan.node.source;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.AlignedPath;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathType;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.queryengine.common.TimeseriesSchemaInfo;
+import org.apache.iotdb.db.queryengine.common.header.ColumnHeader;
+import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class TimeseriesRegionScanNode extends RegionScanNode {
+ // IDeviceID -> (MeasurementPath -> TimeseriesSchemaInfo)
+ private Map<PartialPath, Map<PartialPath, List<TimeseriesSchemaInfo>>>
+ deviceToTimeseriesSchemaInfo;
+
+ public TimeseriesRegionScanNode(
+ PlanNodeId planNodeId, boolean outputCount, TRegionReplicaSet
regionReplicaSet) {
+ super(planNodeId);
+ this.regionReplicaSet = regionReplicaSet;
+ this.outputCount = outputCount;
+ }
+
+ public TimeseriesRegionScanNode(
+ PlanNodeId planNodeId,
+ Map<PartialPath, Map<PartialPath, List<TimeseriesSchemaInfo>>>
deviceToTimeseriesSchemaInfo,
+ boolean outputCount,
+ TRegionReplicaSet regionReplicaSet) {
+ super(planNodeId);
+ this.deviceToTimeseriesSchemaInfo = deviceToTimeseriesSchemaInfo;
+ this.regionReplicaSet = regionReplicaSet;
+ this.outputCount = outputCount;
+ }
+
+ public void setDeviceToTimeseriesSchemaInfo(
+ Map<PartialPath, Map<PartialPath, List<TimeseriesSchemaInfo>>>
deviceToTimeseriesSchemaInfo) {
+ this.deviceToTimeseriesSchemaInfo = deviceToTimeseriesSchemaInfo;
+ }
+
+ public Map<PartialPath, Map<PartialPath, List<TimeseriesSchemaInfo>>>
+ getDeviceToTimeseriesSchemaInfo() {
+ return deviceToTimeseriesSchemaInfo;
+ }
+
+ @Override
+ public List<PlanNode> getChildren() {
+ return ImmutableList.of();
+ }
+
+ @Override
+ public void addChild(PlanNode child) {
+ throw new UnsupportedOperationException("TimeseriesRegionScanNode does not
support addChild");
+ }
+
+ @Override
+ public PlanNode clone() {
+ return new TimeseriesRegionScanNode(
+ getPlanNodeId(), getDeviceToTimeseriesSchemaInfo(), isOutputCount(),
getRegionReplicaSet());
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return outputCount
+ ? ColumnHeaderConstant.countTimeSeriesColumnHeaders.stream()
+ .map(ColumnHeader::getColumnName)
+ .collect(Collectors.toList())
+ : ColumnHeaderConstant.showTimeSeriesColumnHeaders.stream()
+ .map(ColumnHeader::getColumnName)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitTimeSeriesRegionScan(this, context);
+ }
+
+ @Override
+ public int allowedChildCount() {
+ return NO_CHILD_ALLOWED;
+ }
+
+ public static PlanNode deserialize(ByteBuffer buffer) {
+ int size = ReadWriteIOUtils.readInt(buffer);
+ Map<PartialPath, Map<PartialPath, List<TimeseriesSchemaInfo>>>
deviceToTimeseriesSchemaInfo =
+ new HashMap<>();
+ for (int i = 0; i < size; i++) {
+
+ int nodeSize = ReadWriteIOUtils.readInt(buffer);
+ String[] nodes = new String[nodeSize];
+ for (int j = 0; j < nodeSize; j++) {
+ nodes[j] = ReadWriteIOUtils.readString(buffer);
+ }
+ PartialPath devicePath = new PartialPath(nodes);
+
+ int pathSize = ReadWriteIOUtils.readInt(buffer);
+ Map<PartialPath, List<TimeseriesSchemaInfo>> measurementToSchemaInfo =
new HashMap<>();
+ for (int j = 0; j < pathSize; j++) {
+ PartialPath path = deserializePartialPath(nodes, buffer);
+ int schemaSize = ReadWriteIOUtils.readInt(buffer);
+ List<TimeseriesSchemaInfo> schemaInfos = new ArrayList<>();
+ for (int k = 0; k < schemaSize; k++) {
+ schemaInfos.add(TimeseriesSchemaInfo.deserialize(buffer));
+ }
+ measurementToSchemaInfo.put(path, schemaInfos);
+ }
+ deviceToTimeseriesSchemaInfo.put(devicePath, measurementToSchemaInfo);
+ }
+ boolean outputCount = ReadWriteIOUtils.readBool(buffer);
+ return new TimeseriesRegionScanNode(
+ PlanNodeId.deserialize(buffer), deviceToTimeseriesSchemaInfo,
outputCount, null);
+ }
+
+ private static PartialPath deserializePartialPath(String[] deviceNodes,
ByteBuffer buffer) {
+ byte pathType = buffer.get();
+ if (pathType == 0) {
+ String[] newNodes = Arrays.copyOf(deviceNodes, deviceNodes.length + 1);
+ newNodes[deviceNodes.length] = ReadWriteIOUtils.readString(buffer);
+ return new MeasurementPath(newNodes);
+ } else {
+ int size = ReadWriteIOUtils.readInt(buffer);
+ List<String> measurements = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ measurements.add(ReadWriteIOUtils.readString(buffer));
+ }
+ return new AlignedPath(deviceNodes, measurements);
+ }
+ }
+
+ @TestOnly
+ public List<PartialPath> getMeasurementPath() {
+ return deviceToTimeseriesSchemaInfo.values().stream()
+ .map(Map::keySet)
+ .flatMap(Set::stream)
+ .flatMap(
+ path -> {
+ if (path instanceof AlignedPath) {
+ AlignedPath alignedPath = (AlignedPath) path;
+ return alignedPath.getMeasurementList().stream()
+ .map(
+ measurementName -> {
+ try {
+ return new PartialPath(alignedPath.getDevice(),
measurementName);
+ } catch (IllegalPathException e) {
+ return null;
+ }
+ });
+ } else {
+ return Stream.of(path);
+ }
+ })
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "%s[%s]",
+ getClass().getSimpleName(),
+ deviceToTimeseriesSchemaInfo.entrySet().stream()
+ .map(
+ entry ->
+ String.format(
+ "%s -> %s",
+ entry.getKey().getFullPath(),
+ entry.getValue().entrySet().stream()
+ .map(
+ entry1 ->
+ String.format(
+ "%s -> %s",
+ entry1.getKey().getFullPath(),
+ entry1.getValue().stream()
+
.map(TimeseriesSchemaInfo::toString)
+ .collect(Collectors.joining(",
"))))
+ .collect(Collectors.joining(", "))))
+ .collect(Collectors.joining(", ")));
+ }
+
+ @Override
+ public Set<PartialPath> getDevicePaths() {
+ return new HashSet<>(deviceToTimeseriesSchemaInfo.keySet());
+ }
+
+ @Override
+ public void addDevicePath(PartialPath devicePath, RegionScanNode node) {
+ this.deviceToTimeseriesSchemaInfo.put(
+ devicePath,
+ ((TimeseriesRegionScanNode)
node).getDeviceToTimeseriesSchemaInfo().get(devicePath));
+ }
+
+ @Override
+ public void clearPath() {
+ this.deviceToTimeseriesSchemaInfo = new HashMap<>();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof TimeseriesRegionScanNode)) {
+ return false;
+ }
+ TimeseriesRegionScanNode that = (TimeseriesRegionScanNode) o;
+ return
deviceToTimeseriesSchemaInfo.equals(that.deviceToTimeseriesSchemaInfo)
+ && outputCount == that.isOutputCount();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), deviceToTimeseriesSchemaInfo,
outputCount);
+ }
+
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.TIMESERIES_REGION_SCAN.serialize(byteBuffer);
+ ReadWriteIOUtils.write(deviceToTimeseriesSchemaInfo.size(), byteBuffer);
+ for (Map.Entry<PartialPath, Map<PartialPath, List<TimeseriesSchemaInfo>>>
entry :
+ deviceToTimeseriesSchemaInfo.entrySet()) {
+
+ int size = entry.getKey().getNodeLength();
+ ReadWriteIOUtils.write(size, byteBuffer);
+ String[] nodes = entry.getKey().getNodes();
+ for (int i = 0; i < size; i++) {
+ ReadWriteIOUtils.write(nodes[i], byteBuffer);
+ }
+
+ ReadWriteIOUtils.write(entry.getValue().size(), byteBuffer);
+ for (Map.Entry<PartialPath, List<TimeseriesSchemaInfo>> timseriesEntry :
+ entry.getValue().entrySet()) {
+ serializeMeasurements(timseriesEntry.getKey(), byteBuffer);
+ ReadWriteIOUtils.write(timseriesEntry.getValue().size(), byteBuffer);
+ for (TimeseriesSchemaInfo timeseriesSchemaInfo :
timseriesEntry.getValue()) {
+ timeseriesSchemaInfo.serializeAttributes(byteBuffer);
+ }
+ }
+ }
+ ReadWriteIOUtils.write(outputCount, byteBuffer);
+ }
+
+ @Override
+ protected void serializeAttributes(DataOutputStream stream) throws
IOException {
+ PlanNodeType.TIMESERIES_REGION_SCAN.serialize(stream);
+ ReadWriteIOUtils.write(deviceToTimeseriesSchemaInfo.size(), stream);
+ for (Map.Entry<PartialPath, Map<PartialPath, List<TimeseriesSchemaInfo>>>
entry :
+ deviceToTimeseriesSchemaInfo.entrySet()) {
+
+ int size = entry.getKey().getNodeLength();
+ ReadWriteIOUtils.write(size, stream);
+ String[] nodes = entry.getKey().getNodes();
+ for (int i = 0; i < size; i++) {
+ ReadWriteIOUtils.write(nodes[i], stream);
+ }
+
+ ReadWriteIOUtils.write(entry.getValue().size(), stream);
+ for (Map.Entry<PartialPath, List<TimeseriesSchemaInfo>> timseriesEntry :
+ entry.getValue().entrySet()) {
+ serializeMeasurements(timseriesEntry.getKey(), stream);
+ ReadWriteIOUtils.write(timseriesEntry.getValue().size(), stream);
+ for (TimeseriesSchemaInfo timeseriesSchemaInfo :
timseriesEntry.getValue()) {
+ timeseriesSchemaInfo.serializeAttributes(stream);
+ }
+ }
+ }
+ ReadWriteIOUtils.write(outputCount, stream);
+ }
+
+ private void serializeMeasurements(PartialPath path, DataOutputStream
stream) throws IOException {
+ if (path instanceof MeasurementPath) {
+ PathType.Measurement.serialize(stream);
+ ReadWriteIOUtils.write(path.getMeasurement(), stream);
+ } else if (path instanceof AlignedPath) {
+ PathType.Aligned.serialize(stream);
+ AlignedPath alignedPath = (AlignedPath) path;
+ ReadWriteIOUtils.write(alignedPath.getMeasurementList().size(), stream);
+ for (String measurement : alignedPath.getMeasurementList()) {
+ ReadWriteIOUtils.write(measurement, stream);
+ }
+ }
+ }
+
+ private void serializeMeasurements(PartialPath path, ByteBuffer buffer) {
+ if (path instanceof MeasurementPath) {
+ PathType.Measurement.serialize(buffer);
+ ReadWriteIOUtils.write(path.getMeasurement(), buffer);
+ } else if (path instanceof AlignedPath) {
+ PathType.Aligned.serialize(buffer);
+ AlignedPath alignedPath = (AlignedPath) path;
+ ReadWriteIOUtils.write(alignedPath.getMeasurementList().size(), buffer);
+ for (String measurement : alignedPath.getMeasurementList()) {
+ ReadWriteIOUtils.write(measurement, buffer);
+ }
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/CountDevicesStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/CountDevicesStatement.java
index 84ef6b7bdb9..06d32f7c9b5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/CountDevicesStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/CountDevicesStatement.java
@@ -21,12 +21,28 @@ package
org.apache.iotdb.db.queryengine.plan.statement.metadata;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
+import org.apache.iotdb.db.queryengine.plan.statement.component.WhereCondition;
public class CountDevicesStatement extends CountStatement {
+
+ WhereCondition timeCondition;
+
public CountDevicesStatement(PartialPath partialPath) {
super(partialPath);
}
+ public void setTimeCondition(WhereCondition timeCondition) {
+ this.timeCondition = timeCondition;
+ }
+
+ public WhereCondition getTimeCondition() {
+ return timeCondition;
+ }
+
+ public boolean hasTimeCondition() {
+ return timeCondition != null;
+ }
+
@Override
public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
return visitor.visitCountDevices(this, context);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/CountTimeSeriesStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/CountTimeSeriesStatement.java
index 50f7c49e531..c7c259030a9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/CountTimeSeriesStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/CountTimeSeriesStatement.java
@@ -22,11 +22,14 @@ package
org.apache.iotdb.db.queryengine.plan.statement.metadata;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.filter.SchemaFilter;
import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
+import org.apache.iotdb.db.queryengine.plan.statement.component.WhereCondition;
public class CountTimeSeriesStatement extends CountStatement {
private SchemaFilter schemaFilter;
+ private WhereCondition timeCondition;
+
public CountTimeSeriesStatement(PartialPath partialPath) {
super(partialPath);
}
@@ -39,6 +42,18 @@ public class CountTimeSeriesStatement extends CountStatement
{
this.schemaFilter = schemaFilter;
}
+ public void setTimeCondition(WhereCondition timeCondition) {
+ this.timeCondition = timeCondition;
+ }
+
+ public boolean hasTimeCondition() {
+ return timeCondition != null;
+ }
+
+ public WhereCondition getTimeCondition() {
+ return timeCondition;
+ }
+
@Override
public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
return visitor.visitCountTimeSeries(this, context);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/ShowDevicesStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/ShowDevicesStatement.java
index 1911d1644c1..75f33c7b4a1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/ShowDevicesStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/ShowDevicesStatement.java
@@ -22,6 +22,7 @@ package
org.apache.iotdb.db.queryengine.plan.statement.metadata;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.filter.SchemaFilter;
import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
+import org.apache.iotdb.db.queryengine.plan.statement.component.WhereCondition;
import java.util.Collections;
import java.util.List;
@@ -38,6 +39,7 @@ public class ShowDevicesStatement extends ShowStatement {
private final PartialPath pathPattern;
private boolean hasSgCol;
private SchemaFilter schemaFilter;
+ private WhereCondition timeCondition;
public ShowDevicesStatement(PartialPath pathPattern) {
super();
@@ -64,6 +66,18 @@ public class ShowDevicesStatement extends ShowStatement {
return hasSgCol;
}
+ public void setTimeCondition(WhereCondition timeCondition) {
+ this.timeCondition = timeCondition;
+ }
+
+ public WhereCondition getTimeCondition() {
+ return timeCondition;
+ }
+
+ public boolean hasTimeCondition() {
+ return timeCondition != null;
+ }
+
@Override
public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
return visitor.visitShowDevices(this, context);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/ShowTimeSeriesStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/ShowTimeSeriesStatement.java
index ca7fddb8764..82a389d6224 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/ShowTimeSeriesStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/ShowTimeSeriesStatement.java
@@ -22,6 +22,7 @@ package
org.apache.iotdb.db.queryengine.plan.statement.metadata;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.filter.SchemaFilter;
import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
+import org.apache.iotdb.db.queryengine.plan.statement.component.WhereCondition;
import java.util.Collections;
import java.util.List;
@@ -37,11 +38,10 @@ import java.util.List;
public class ShowTimeSeriesStatement extends ShowStatement {
private final PartialPath pathPattern;
-
private SchemaFilter schemaFilter;
-
// if is true, the result will be sorted according to the inserting
frequency of the time series
private final boolean orderByHeat;
+ private WhereCondition timeCondition;
public ShowTimeSeriesStatement(PartialPath pathPattern, boolean orderByHeat)
{
super();
@@ -65,6 +65,18 @@ public class ShowTimeSeriesStatement extends ShowStatement {
return orderByHeat;
}
+ public void setTimeCondition(WhereCondition timeCondition) {
+ this.timeCondition = timeCondition;
+ }
+
+ public WhereCondition getTimeCondition() {
+ return timeCondition;
+ }
+
+ public boolean hasTimeCondition() {
+ return timeCondition != null;
+ }
+
@Override
public List<PartialPath> getPaths() {
return Collections.singletonList(pathPattern);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/RegionScanPlanningTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/RegionScanPlanningTest.java
new file mode 100644
index 00000000000..a1c68d0cc30
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/RegionScanPlanningTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.distribution;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.RegionMergeNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.DeviceRegionScanNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.TimeseriesRegionScanNode;
+
+import org.apache.tsfile.file.metadata.PlainDeviceID;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class RegionScanPlanningTest {
+
+ private static final Set<PartialPath> devicePaths = new HashSet<>();
+ private static final Set<PartialPath> path = new HashSet<>();
+
+ static {
+ try {
+ devicePaths.add(new PartialPath(new PlainDeviceID("root.sg.d1")));
+ devicePaths.add(new PartialPath(new PlainDeviceID("root.sg.d22")));
+ devicePaths.add(new PartialPath(new PlainDeviceID("root.sg.d333")));
+ devicePaths.add(new PartialPath(new PlainDeviceID("root.sg.d4444")));
+ devicePaths.add(new PartialPath(new PlainDeviceID("root.sg.d55555")));
+ devicePaths.add(new PartialPath(new PlainDeviceID("root.sg.d666666")));
+
+ path.add(new MeasurementPath("root.sg.d1.s1"));
+ path.add(new MeasurementPath("root.sg.d1.s2"));
+ path.add(new MeasurementPath("root.sg.d22.s1"));
+ path.add(new MeasurementPath("root.sg.d22.s2"));
+ path.add(new MeasurementPath("root.sg.d333.s1"));
+ path.add(new MeasurementPath("root.sg.d333.s2"));
+ path.add(new MeasurementPath("root.sg.d4444.s1"));
+ path.add(new MeasurementPath("root.sg.d4444.s2"));
+ path.add(new MeasurementPath("root.sg.d55555.s1"));
+ path.add(new MeasurementPath("root.sg.d55555.s2"));
+ path.add(new MeasurementPath("root.sg.d666666.s1"));
+ path.add(new MeasurementPath("root.sg.d666666.s2"));
+ } catch (IllegalPathException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testShowDevicesWithTimeCondition() {
+ QueryId queryId = new QueryId("test");
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new
TEndPoint());
+
+ String sql = "show devices where time > 1000";
+ Analysis analysis = Util.analyze(sql, context);
+ PlanNode logicalPlanNode = Util.genLogicalPlan(analysis, context);
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context,
logicalPlanNode));
+ DistributedQueryPlan plan = planner.planFragments();
+ assertEquals(4, plan.getFragments().size());
+
+ PlanNode f1Root =
plan.getInstances().get(0).getFragment().getPlanNodeTree();
+ assertTrue(f1Root instanceof IdentitySinkNode);
+ f1Root = f1Root.getChildren().get(0);
+ assertTrue(f1Root instanceof RegionMergeNode);
+ assertEquals(4, f1Root.getChildren().size());
+ assertTrue(f1Root.getChildren().get(0) instanceof DeviceRegionScanNode);
+ Set<PartialPath> targetPaths =
+ new HashSet<>(((DeviceRegionScanNode)
f1Root.getChildren().get(0)).getDevicePaths());
+ for (int i = 1; i < 4; i++) {
+ PlanNode fRoot =
plan.getInstances().get(i).getFragment().getPlanNodeTree();
+ assertTrue(fRoot instanceof IdentitySinkNode);
+
+ for (PlanNode child : fRoot.getChildren()) {
+ assertTrue(child instanceof DeviceRegionScanNode);
+ targetPaths.addAll(((DeviceRegionScanNode) child).getDevicePaths());
+ }
+ }
+ assertEquals(devicePaths, targetPaths);
+ }
+
+ @Test
+ public void testShowTimeseriesWithTimeCondition() throws
IllegalPathException {
+ QueryId queryId = new QueryId("test");
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new
TEndPoint());
+
+ String sql = "show timeseries where time > 1000";
+ Analysis analysis = Util.analyze(sql, context);
+ PlanNode logicalPlanNode = Util.genLogicalPlan(analysis, context);
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context,
logicalPlanNode));
+ DistributedQueryPlan plan = planner.planFragments();
+ assertEquals(4, plan.getFragments().size());
+
+ PlanNode f1Root =
plan.getInstances().get(0).getFragment().getPlanNodeTree();
+ assertTrue(f1Root instanceof IdentitySinkNode);
+ f1Root = f1Root.getChildren().get(0);
+ assertTrue(f1Root instanceof RegionMergeNode);
+ assertEquals(4, f1Root.getChildren().size());
+ assertTrue(f1Root.getChildren().get(0) instanceof
TimeseriesRegionScanNode);
+ Set<PartialPath> targetDevicePaths =
+ new HashSet<>(((TimeseriesRegionScanNode)
f1Root.getChildren().get(0)).getDevicePaths());
+ Set<PartialPath> targetMeasurementPaths =
+ new HashSet<>(
+ ((TimeseriesRegionScanNode)
f1Root.getChildren().get(0)).getMeasurementPath());
+ for (int i = 1; i < 4; i++) {
+ PlanNode fRoot =
plan.getInstances().get(i).getFragment().getPlanNodeTree();
+ assertTrue(fRoot instanceof IdentitySinkNode);
+
+ for (PlanNode child : fRoot.getChildren()) {
+ assertTrue(child instanceof TimeseriesRegionScanNode);
+ targetDevicePaths.addAll(((TimeseriesRegionScanNode)
child).getDevicePaths());
+ targetMeasurementPaths.addAll(((TimeseriesRegionScanNode)
child).getMeasurementPath());
+ }
+ }
+ assertEquals(devicePaths, targetDevicePaths);
+ assertEquals(path, targetMeasurementPaths);
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/logical/RegionScanLogicalPlannerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/logical/RegionScanLogicalPlannerTest.java
new file mode 100644
index 00000000000..0a582704775
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/logical/RegionScanLogicalPlannerTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.logical;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.AlignedPath;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.common.TimeseriesSchemaInfo;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.LimitNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.OffsetNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.DeviceRegionScanNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.TimeseriesRegionScanNode;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.PlainDeviceID;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.iotdb.db.queryengine.plan.planner.logical.LogicalPlannerTestUtil.parseSQLToPlanNode;
+
+public class RegionScanLogicalPlannerTest {
+
+ private static Map<PartialPath, Map<PartialPath, List<TimeseriesSchemaInfo>>>
+ deviceToTimeseriesSchemaInfoMap;
+
+ private static Map<PartialPath, Map<PartialPath, List<TimeseriesSchemaInfo>>>
+ getDeviceToTimeseriesSchemaInfoMap() throws IllegalPathException {
+
+ if (deviceToTimeseriesSchemaInfoMap != null) {
+ return deviceToTimeseriesSchemaInfoMap;
+ }
+
+ deviceToTimeseriesSchemaInfoMap = new HashMap<>();
+ Map<PartialPath, List<TimeseriesSchemaInfo>> timeseriesSchemaInfoMap = new
HashMap<>();
+ timeseriesSchemaInfoMap.put(
+ new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
+ Collections.singletonList(
+ new TimeseriesSchemaInfo("INT32", "PLAIN", "LZ4",
"{\"key1\":\"value1\"}", "", "")));
+ timeseriesSchemaInfoMap.put(
+ new MeasurementPath("root.sg.d1.s2", TSDataType.DOUBLE),
+ Collections.singletonList(
+ new TimeseriesSchemaInfo("DOUBLE", "PLAIN", "LZ4",
"{\"key1\":\"value1\"}", "", "")));
+ timeseriesSchemaInfoMap.put(
+ new MeasurementPath("root.sg.d1.s3", TSDataType.BOOLEAN),
+ Collections.singletonList(
+ new TimeseriesSchemaInfo("BOOLEAN", "PLAIN", "LZ4",
"{\"key1\":\"value2\"}", "", "")));
+ deviceToTimeseriesSchemaInfoMap.put(
+ new PartialPath(new PlainDeviceID("root.sg.d1")),
timeseriesSchemaInfoMap);
+
+ Map<PartialPath, List<TimeseriesSchemaInfo>> timeseriesSchemaInfoMap2 =
new HashMap<>();
+ timeseriesSchemaInfoMap2.put(
+ new MeasurementPath("root.sg.d2.s1", TSDataType.INT32),
+ Collections.singletonList(
+ new TimeseriesSchemaInfo("INT32", "PLAIN", "LZ4",
"{\"key1\":\"value1\"}", "", "")));
+ timeseriesSchemaInfoMap2.put(
+ new MeasurementPath("root.sg.d2.s2", TSDataType.DOUBLE),
+ Collections.singletonList(
+ new TimeseriesSchemaInfo("DOUBLE", "PLAIN", "LZ4",
"{\"key1\":\"value1\"}", "", "")));
+ timeseriesSchemaInfoMap2.put(
+ new MeasurementPath("root.sg.d2.s4", TSDataType.TEXT),
+ Collections.singletonList(
+ new TimeseriesSchemaInfo("TEXT", "PLAIN", "LZ4",
"{\"key2\":\"value1\"}", "", "")));
+ deviceToTimeseriesSchemaInfoMap.put(
+ new PartialPath(new PlainDeviceID("root.sg.d2")),
timeseriesSchemaInfoMap2);
+
+ List<String> schemas = new ArrayList<>();
+ schemas.add("s1");
+ schemas.add("s2");
+ List<TimeseriesSchemaInfo> timeseriesSchemaInfoList = new ArrayList<>();
+ Map<PartialPath, List<TimeseriesSchemaInfo>> timeseriesSchemaInfoMap3 =
new HashMap<>();
+ timeseriesSchemaInfoList.add(
+ new TimeseriesSchemaInfo("INT32", "PLAIN", "LZ4",
"{\"key1\":\"value1\"}", "", ""));
+ timeseriesSchemaInfoList.add(
+ new TimeseriesSchemaInfo("DOUBLE", "PLAIN", "LZ4",
"{\"key1\":\"value1\"}", "", ""));
+ timeseriesSchemaInfoMap3.put(
+ new AlignedPath("root.sg.d2.a", schemas, Collections.emptyList()),
+ timeseriesSchemaInfoList);
+ deviceToTimeseriesSchemaInfoMap.put(
+ new PartialPath(new PlainDeviceID("root.sg.d2.a")),
timeseriesSchemaInfoMap3);
+
+ return deviceToTimeseriesSchemaInfoMap;
+ }
+
+ @Test
+ public void testShowDevicesWithTimeCondition() throws IllegalPathException {
+ String sql = "show devices where time > 1000";
+
+ QueryId queryId = new QueryId("test");
+ // fake initResultNodeContext()
+ queryId.genPlanNodeId();
+
+ Map<PartialPath, Boolean> devicePathsToAligned = new HashMap<>();
+ devicePathsToAligned.put(new PartialPath(new PlainDeviceID("root.sg.d1")),
false);
+ devicePathsToAligned.put(new PartialPath(new PlainDeviceID("root.sg.d2")),
false);
+ devicePathsToAligned.put(new PartialPath(new
PlainDeviceID("root.sg.d2.a")), true);
+
+ DeviceRegionScanNode regionScanNode =
+ new DeviceRegionScanNode(queryId.genPlanNodeId(),
devicePathsToAligned, false, null);
+
+ PlanNode actualPlan = parseSQLToPlanNode(sql);
+ Assert.assertEquals(actualPlan, regionScanNode);
+ }
+
+ @Test
+ public void testShowDevicesWithTimeConditionWithLimitOffset() throws
IllegalPathException {
+ String sql = "show devices where time > 1000 limit 20 offset 10";
+
+ QueryId queryId = new QueryId("test");
+ // fake initResultNodeContext()
+ queryId.genPlanNodeId();
+
+ Map<PartialPath, Boolean> devicePathsToAligned = new HashMap<>();
+ devicePathsToAligned.put(new PartialPath(new PlainDeviceID("root.sg.d1")),
false);
+ devicePathsToAligned.put(new PartialPath(new PlainDeviceID("root.sg.d2")),
false);
+ devicePathsToAligned.put(new PartialPath(new
PlainDeviceID("root.sg.d2.a")), true);
+
+ DeviceRegionScanNode regionScanNode =
+ new DeviceRegionScanNode(queryId.genPlanNodeId(),
devicePathsToAligned, false, null);
+
+ LimitNode limitNode = new LimitNode(queryId.genPlanNodeId(), 20);
+ limitNode.addChild(regionScanNode);
+ OffsetNode offsetNode = new OffsetNode(queryId.genPlanNodeId(), 10);
+ offsetNode.addChild(limitNode);
+
+ PlanNode actualPlan = parseSQLToPlanNode(sql);
+ Assert.assertEquals(actualPlan, offsetNode);
+ }
+
+ @Test
+ public void testCountDevicesWithTimeConditionWithLimitOffset() throws
IllegalPathException {
+ String sql = "count devices where time > 1000";
+
+ QueryId queryId = new QueryId("test");
+ // fake initResultNodeContext()
+ queryId.genPlanNodeId();
+
+ Map<PartialPath, Boolean> devicePathsToAligned = new HashMap<>();
+ devicePathsToAligned.put(new PartialPath(new PlainDeviceID("root.sg.d1")),
false);
+ devicePathsToAligned.put(new PartialPath(new PlainDeviceID("root.sg.d2")),
false);
+ devicePathsToAligned.put(new PartialPath(new
PlainDeviceID("root.sg.d2.a")), true);
+
+ DeviceRegionScanNode regionScanNode =
+ new DeviceRegionScanNode(queryId.genPlanNodeId(),
devicePathsToAligned, true, null);
+
+ PlanNode actualPlan = parseSQLToPlanNode(sql);
+ Assert.assertEquals(actualPlan, regionScanNode);
+ }
+
+ @Test
+ public void testCountTimeseriesWithTimeConditionWithLimitOffset() throws
IllegalPathException {
+ String sql = "count timeseries where time > 1000";
+
+ QueryId queryId = new QueryId("test");
+ // fake initResultNodeContext()
+ queryId.genPlanNodeId();
+
+ TimeseriesRegionScanNode regionScanNode =
+ new TimeseriesRegionScanNode(
+ queryId.genPlanNodeId(), getDeviceToTimeseriesSchemaInfoMap(),
true, null);
+
+ PlanNode actualPlan = parseSQLToPlanNode(sql);
+ Assert.assertEquals(actualPlan, regionScanNode);
+ }
+
+ @Test
+ public void serializeDeserializeTest() throws IllegalPathException {
+
+ TimeseriesRegionScanNode timeseriesRegionScanNode =
+ new TimeseriesRegionScanNode(
+ new PlanNodeId("timeseries_test_id"),
getDeviceToTimeseriesSchemaInfoMap(), true, null);
+
+ ByteBuffer buffer = ByteBuffer.allocate(10240);
+ timeseriesRegionScanNode.serialize(buffer);
+ buffer.flip();
+ Assert.assertEquals(timeseriesRegionScanNode,
PlanNodeType.deserialize(buffer));
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedPath.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedPath.java
index 7f0ff677600..5b07e4b0615 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedPath.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedPath.java
@@ -115,6 +115,12 @@ public class AlignedPath extends PartialPath {
schemaList = new ArrayList<>();
}
+ public AlignedPath(String[] nodes, List<String> measurementList) {
+ super(nodes);
+ this.measurementList = measurementList;
+ this.schemaList = new ArrayList<>();
+ }
+
/**
* This method is used by last read. Comparator<Binary> and
Comparator<String> behaves differently
* and that is why we use Comparator<Binary> here.
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java
index 10f7dd4d196..20b50f237ce 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java
@@ -100,6 +100,10 @@ public class MeasurementPath extends PartialPath {
this.measurementSchema = schema;
}
+ public MeasurementPath(String[] nodes) {
+ super(nodes);
+ }
+
@Override
public IMeasurementSchema getMeasurementSchema() {
return measurementSchema;