This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f514abd [IOTDB-536] Process about nonExistMeasurement and
ConstantMeasurement in AlignByDevice is complex and not intuitive (#874)
f514abd is described below
commit f514abd99a99a21ee60279b799a95290036590a6
Author: Jialin Qiao <[email protected]>
AuthorDate: Thu Mar 5 19:40:50 2020 +0800
[IOTDB-536] Process about nonExistMeasurement and ConstantMeasurement in
AlignByDevice is complex and not intuitive (#874)
---
.../5-DataQuery/7-AlignByDeviceQuery.md | 203 +++++++++++++++++++++
.../apache/iotdb/db/qp/constant/SQLConstant.java | 2 +-
.../db/qp/physical/crud/AlignByDevicePlan.java | 103 +++--------
.../iotdb/db/qp/strategy/PhysicalGenerator.java | 41 +++--
.../db/query/dataset/AlignByDeviceDataSet.java | 98 +++-------
.../org/apache/iotdb/db/service/TSServiceImpl.java | 133 ++++----------
6 files changed, 321 insertions(+), 259 deletions(-)
diff --git
a/docs/Documentation-CHN/SystemDesign/5-DataQuery/7-AlignByDeviceQuery.md
b/docs/Documentation-CHN/SystemDesign/5-DataQuery/7-AlignByDeviceQuery.md
new file mode 100644
index 0000000..25042e2
--- /dev/null
+++ b/docs/Documentation-CHN/SystemDesign/5-DataQuery/7-AlignByDeviceQuery.md
@@ -0,0 +1,203 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+# 按设备对齐查询
+
+AlignByDevicePlan 即按设备对齐查询对应的表结构为:
+
+| Time | Device | sensor1 | sensor2 | sensor3 | ... |
+| ---- | ------ | ------- | ------- | ------- | --- |
+| | | | | | |
+
+## 设计原理
+
+按设备对齐查询其实现原理主要是计算出查询中每个设备对应的测点和过滤条件,然后将查询按设备分别进行,最后将结果集拼装并返回。
+
+### AlignByDevicePlan 中重要字段含义
+
+首先解释一下 AlignByDevicePlan 中一些重要字段的含义:
+- `List<String> measurements`:查询中出现的 measurement 列表。
+- `Map<Path, TSDataType> dataTypeMapping`: 该变量继承自基类
QueryPlan,其主要作用是在计算每个设备的执行路径时,提供此次查询的 paths 对应的数据类型。
+- `Map<String, Set<String>> deviceToMeasurementsMap`, `Map<String,
IExpression> deviceToFilterMap`: 这两个字段分别用来存储设备对应的测点和过滤条件。
+- `Map<String, TSDataType> measurementDataTypeMap`:AlignByDevicePlan 要求不同设备的同名
sensor 数据类型一致,该字段是一个 `measurementName -> dataType` 的 Map 结构,用来验证同名 sensor
的数据类型一致性。如 `root.sg.d1.s1` 和 `root.sg.d2.s1` 应该是同一数据类型。
+- `enum MeasurementType`:记录三种 measurement 类型。在任何设备中都不存在的 measurement 为
`NonExist` 类型;有单引号或双引号的 measurement 为 `Constant` 类型;存在的 measurement 为 `Exist`
类型。
+- `Map<String, MeasurementType> measurementTypeMap`: 该字段是一个 `measureName ->
measurementType` 的 Map 结构,用来记录查询中所有 measurement 的类型。
+- groupByPlan, fillQueryPlan, aggregationPlan:为了避免冗余,这三个执行计划被设定为
RawDataQueryPlan 的子类,而在 AlignByDevicePlan 中被设置为变量。如果查询计划属于这三个计划中的一种,则该字段会被赋值并保存。
+
+在进行具体实现过程的讲解前,先给出一个覆盖较为完整的例子,下面的解释过程中将结合该示例进行说明。
+
+```sql
+SELECT s1, "1", *, s2, s5 FROM root.sg.d1, root.sg.* WHERE time = 1 AND s1 <
25 ALIGN BY DEVICE
+```
+
+其中,系统中的时间序列为:
+
+- root.sg.d1.s1
+- root.sg.d1.s2
+- root.sg.d2.s1
+
+存储组 `root.sg` 共包含两个设备 d1 和 d2,其中 d1 有两个传感器 s1 和 s2,d2 只有传感器 s1,相同传感器 s1
的数据类型相同。
+
+下面将按具体过程进行分别解释:
+
+### 逻辑计划生成
+
+- org.apache.iotdb.db.qp.Planner
+
+与原始数据查询不同,按设备对齐查询并不在此阶段进行 SELECT 语句和 WHERE
语句中后缀路径的拼接,而将在后续生成物理计划时,计算出每个设备对应的映射值和过滤条件。因此,按设备对齐在此阶段所做的工作只包括对 WHERE
语句中过滤条件的优化。
+
+对过滤条件的优化主要包括三部分:去非、转化析取范式、合并同路径过滤条件。对应的优化器分别为:RemoveNotOptimizer,
DnfFilterOptimizer,
MergeSingleFilterOptimizer。该部分逻辑可参考:[Planner](/#/SystemDesign/progress/chap2/sec2).
+
+### 物理计划生成
+
+- org.apache.iotdb.db.qp.strategy.PhysicalGenerator
+
+生成逻辑计划后,将调用 PhysicalGenerator 类中的 `transformToPhysicalPlan()`
方法将该逻辑计划转化为物理计划。对于按设备对齐查询,该方法的主要逻辑实现在 `transformQuery()` 方法中。
+
+**该阶段所做的主要工作为生成查询对应的** `AlignByDevicePlan`,**填充其中的变量信息。**
+
+首先解释一下 `transformQuery()` 方法中一些重要字段的含义(与 AlignByDevicePlan 中重复的字段见上文):
+
+- prefixPaths, suffixPaths:前者为 FROM 子句中的前缀路径,示例中为 `[root.sg.d1, root.sg.*]`;
后者为 SELECT 子句中的后缀路径,示例中为 `[s1, "1", *, s2, s5]`.
+- devices:对前缀路径去通配符和设备去重后得到的设备列表,示例中为 `[root.sg.d1, root.sg.d2]`。
+- measurementSetOfGivenSuffix:中间变量,记录某一 suffix 对应的 measurement,示例中,对于后缀 \*,
`measurementSetOfGivenSuffix = {s1,s2}`,对于后缀 s1, `measurementSetOfGivenSuffix =
{s1}`;
+
+接下来介绍 AlignByDevicePlan 的计算过程:
+
+1. 检查查询类型是否为 groupByPlan, fillQueryPlan, aggregationPlan
这三类查询中的一种,如果是则对相应的变量进行赋值,并更改 `AlignByDevicePlan` 的查询类型。
+2. 遍历 SELECT 后缀路径,对每一个后缀路径设置一个中间变量为
`measurementSetOfGivenSuffix`,用来记录该后缀路径对应的所有 measurement。如果后缀路径以单引号或双引号开头,则直接在
`measurements` 中增加该值,并记录其类型为 `Constant` 类型。
+3. 否则将设备列表与该后缀路径拼接,得到完整的路径,如果拼接后的路径不存在,需要进一步判断该 measurement
是否在其它设备中存在,如果都没有则暂时识别为 `NonExist`,如果后续出现设备存在该 measurement,则覆盖 `NonExist` 值为
`Exist`。
+4. 如果拼接后路径存在,则证明 measurement 是 `Exist` 类型,需要检验数据类型的一致性,不满足返回错误信息,满足则记录下该
Measurement,对 `measurementSetOfGivenSuffix`, `deviceToMeasurementsMap` 等进行更新。
+5. 在一层 suffix 循环结束后,将该层循环中出现的 `measurementSetOfGivenSuffix` 加入 `measurements`
中。在整个循环结束后,将循环中得到的变量信息赋值到 AlignByDevicePlan 中。此处得到的 measurements 列表是未经过去重的,在生成
`ColumnHeader` 时将进行去重。
+6. 最后调用 `concatFilterByDevice()` 方法计算 `deviceToFilterMap`,得到将每个设备分别拼接后对应的
Filter 信息。
+
+```java
+Map<String, IExpression> concatFilterByDevice(List<String> devices,
+ FilterOperator operator)
+输入:去重后的 devices 列表和未拼接的 FilterOperator
+输入:经过拼接后的 deviceToFilterMap,记录了每个设备对应的 Filter 信息
+```
+
+`concatFilterByDevice()` 方法的主要处理逻辑在 `concatFilterPath()` 中:
+
+`concatFilterPath()` 方法遍历未拼接的 FilterOperator
二叉树,判断节点是否为叶子节点,如果是,则取该叶子结点的路径,如果路径以 time 或 root
开头则不做处理,否则将设备名与节点路径进行拼接后返回;如果不是,则对该节点的所有子节点进行迭代处理。示例中,设备1过滤条件拼接后的结果为 `time = 1
AND root.sg.d1.s1 < 25`,设备2为 `time = 1 AND root.sg.d2.s1 < 25`。
+
+下面用示例总结一下通过该阶段计算得到的变量信息:
+
+- measurement 列表 `measurements`:`[s1, "1", s1, s2, s2, s5]`
+- measurement 类型 `measurementTypeMap`:
+ - `s1 -> Exist`
+ - `s2 -> Exist`
+ - `"1" -> Constant`
+ - `s5 -> NonExist`
+- 每个设备的测点 `deviceToMeasurementsMap`:
+ - `root.sg.d1 -> s1, s2`
+ - `root.sg.d2 -> s1`
+- 每个设备的过滤条件 `deviceToFilterMap`:
+ - `root.sg.d1 -> time = 1 AND root.sg.d1.s1 < 25`
+ - `root.sg.d2 -> time = 1 AND root.sg.d2.s1 < 25`
+
+### 构造表头 (ColumnHeader)
+
+- org.apache.iotdb.db.service.TSServiceImpl
+
+在生成物理计划后,则可以执行 TSServiceImpl 中的 executeQueryStatement() 方法生成结果集并返回,其中第一步是构造表头。
+
+按设备对齐查询在调用 `TSServiceImpl.getQueryColumnHeaders()` 方法后,根据查询类型进入
`TSServiceImpl.getAlignByDeviceQueryHeaders()` 来构造表头。
+
+`getAlignByDeviceQueryHeaders()` 方法声明如下:
+
+```java
+private void getAlignByDeviceQueryHeaders(
+ AlignByDevicePlan plan, List<String> respColumns, List<String>
columnTypes)
+输入:当前执行的物理计划 AlignByDevicePlan 和需要输出的列名 respColumns 以及其对应的数据类型 columnTypes
+输出:计算得到的列名 respColumns 和数据类型 columnTypes
+```
+
+其具体实现逻辑如下:
+
+1. 首先加入 `Device` 列,其数据类型为 `TEXT`;
+2. 遍历未去重的 measurements 列表,判断当前遍历 measurement 的类型,如果是 `Exist` 类型则从
`measurementTypeMap` 中取得其类型;其余两种类型设其类型为 `TEXT`,然后将 measurement 及其类型加入表头数据结构中。
+3. 根据中间变量 `deduplicatedMeasurements` 对 measurements 进行去重。
+
+最终得到的 Header 为:
+
+| Time | Device | s1 | 1 | s1 | s2 | s2 | s5 |
+| ---- | ------ | --- | --- | --- | --- | --- | --- |
+| | | | | | | | |
+
+去重后的 `measurements` 为 `[s1, "1", s2, s5]`。
+
+### 结果集生成
+
+生成 ColumnHeader 后,最后一步为生成结果集填充结果并返回。
+
+#### 结果集创建
+
+- org.apache.iotdb.db.service.TSServiceImpl
+
+该阶段需要调用 `TSServiceImpl.createQueryDataSet()` 创建一个新的结果集,这部分实现逻辑较为简单,对于
AlignByDeviceQuery 而言,只需要新建一个 `AlignByDeviceDataSet` 即可,在构造函数中将把
AlignByDevicePlan 中的参数赋值到新建的结果集中。
+
+#### 结果集填充
+
+- org.apache.iotdb.db.utils.QueryDataSetUtils
+
+接下来需要填充结果,AlignByDeviceQuery 将调用 `TSServiceImpl.fillRpcReturnData()`
方法,然后根据查询类型进入 `QueryDataSetUtils.convertQueryDataSetByFetchSize()` 方法.
+
+`convertQueryDataSetByFetchSize()` 方法中获取结果的重要方法为 QueryDataSet 的 `hasNext()` 方法。
+
+`hasNext()` 方法的主要逻辑如下:
+
+1. 判断是否有规定行偏移量 `rowOffset`,如果有则跳过需要偏移的行数;如果结果总行数少于规定的偏移量,则返回 false。
+2. 判断是否有规定行数限制 `rowLimit`,如果有则比较当前输出行数,当前输出行数大于行数限制则返回 false。
+3. 进入 `AlignByDeviceDataSet.hasNextWithoutConstraint()` 方法
+
+<br>
+
+- org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet
+
+首先解释一下结果集中重要字段的含义:
+
+-
`deviceIterator`:按设备对齐查询本质上是计算出每个设备对应的映射值和过滤条件,然后将查询按设备分别进行,该字段即为设备的迭代器,每次查询获取一个设备进行。
+- `currentDataSet`:该字段代表了本次对某设备查询所获得的结果集。
+
+`hasNextWithoutConstraint()`
方法所做的工作主要是判断当前结果集是否有下一结果,没有则获取下一设备,计算该设备执行查询需要的路径、数据类型及过滤条件,然后按其查询类型执行具体的查询计划后获得结果集,直至没有设备可进行查询。
+
+其具体实现逻辑如下:
+
+1. 首先判断当前结果集是否被初始化且有下一个结果,如果是则直接返回 true,即当前可以调用 `next()` 方法获取下一个
`RowRecord`;否则设置结果集未被初始化进入步骤2.
+2. 迭代 `deviceIterator` 获取本次执行需要的设备,之后 `deviceToMeasurementsMap` 中取得该设备对应的测点,得到
`executeColumns`.
+3. 拼接当前设备名与 measurements,计算当前设备的查询路径、数据类型及过滤条件,得到对应的字段分别为 `executePaths`,
`tsDataTypes`, `expression`,如果是聚合查询,则还需要计算 `executeAggregations`。
+4. 判断当前子查询类型为 GroupByQuery, AggregationQuery, FillQuery 或 RawDataQuery
进行对应的查询并返回结果集,实现逻辑可参考[原始数据查询](/#/SystemDesign/progress/chap5/sec3),[聚合查询](/#/SystemDesign/progress/chap5/sec4),[降采样查询](/#/SystemDesign/progress/chap5/sec5)。
+
+通过 `hasNextWithoutConstraint()` 方法初始化结果集并确保有下一结果后,则可调用 `QueryDataSet.next()`
方法获取下一个 `RowRecord`.
+
+`next()` 方法主要实现逻辑为 `AlignByDeviceDataSet.nextWithoutConstraint()` 方法。
+
+`nextWithoutConstraint()`
方法所做的工作主要是**将单个设备查询所得到的按时间对齐的结果集形式变换为按设备对齐的结果集形式**,并返回变换后的 `RowRecord`。
+
+其具体实现逻辑如下:
+
+1. 首先从结果集中取得下一个按时间对齐的 `originRowRecord`。
+2. 新建一个添加了时间戳的 `RowRecord`,向其中加入设备列,先根据 `executeColumns` 与得到的结果建立一个由
`measurementName -> Field` 的 Map 结构 `currentColumnMap`.
+3. 之后只需要遍历去重后的 `measurements` 列表,判断其类型,如果类型为 `Exist` 则根据 measurementName 从
`currentColumnMap` 中取得其对应的结果,如果没有则设为 `null`;如果是 `NonExist`类型,则直接设为 `null`; 如果是
`Constant` 类型,则将 `measureName` 作为该列的值。
+
+再根据变换后的 `RowRecord` 写入输出数据流后,即可将结果集返回。
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
index 19b5909..3cffb2c 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
@@ -36,7 +36,7 @@ public class SQLConstant {
// forbidding instantiation
}
- public static final String GROUPBY_DEVICE_COLUMN_NAME = "Device";
+ public static final String ALIGNBY_DEVICE_COLUMN_NAME = "Device";
public static final String RESERVED_TIME = "time";
public static final String IS_AGGREGATION = "IS_AGGREGATION";
public static final String NOW_FUNC = "now()";
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
index c5700b1..5faf7b3 100644
---
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
+++
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.db.qp.physical.crud;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -29,25 +28,18 @@ import org.apache.iotdb.tsfile.read.expression.IExpression;
public class AlignByDevicePlan extends QueryPlan {
- private List<String> measurements; // e.g. temperature, status, speed
+ private List<String> measurements; // to record result measurement columns,
e.g. temperature, status, speed
private Map<String, Set<String>> deviceToMeasurementsMap; // e.g. root.ln.d1
-> temperature
// to check data type consistency for the same name sensor of different
devices
- private Map<String, TSDataType> dataTypeConsistencyChecker;
+ private Map<String, TSDataType> measurementDataTypeMap;
private Map<String, IExpression> deviceToFilterMap;
+ // to record different kinds of measurement
+ private Map<String, MeasurementType> measurementTypeMap;
private GroupByPlan groupByPlan;
private FillQueryPlan fillQueryPlan;
private AggregationPlan aggregationPlan;
- // the measurements that do not exist in any device,
- // data type is considered as Boolean. The value is considered as null
- private List<String> notExistMeasurements = new ArrayList<>();
- private List<Integer> positionOfNotExistMeasurements = new ArrayList<>();
- // the measurements that have quotation mark. e.g. "abc",
- // '11', the data type is considered as String and the value is considered
is the same with measurement name
- private List<String> constMeasurements = new ArrayList<>();
- private List<Integer> positionOfConstMeasurements = new ArrayList<>();
-
public AlignByDevicePlan() {
super();
}
@@ -69,13 +61,13 @@ public class AlignByDevicePlan extends QueryPlan {
return deviceToMeasurementsMap;
}
- public void setDataTypeConsistencyChecker(
- Map<String, TSDataType> dataTypeConsistencyChecker) {
- this.dataTypeConsistencyChecker = dataTypeConsistencyChecker;
+ public void setMeasurementDataTypeMap(
+ Map<String, TSDataType> measurementDataTypeMap) {
+ this.measurementDataTypeMap = measurementDataTypeMap;
}
- public Map<String, TSDataType> getDataTypeConsistencyChecker() {
- return dataTypeConsistencyChecker;
+ public Map<String, TSDataType> getMeasurementDataTypeMap() {
+ return measurementDataTypeMap;
}
public Map<String, IExpression> getDeviceToFilterMap() {
@@ -86,6 +78,15 @@ public class AlignByDevicePlan extends QueryPlan {
this.deviceToFilterMap = deviceToFilterMap;
}
+ public Map<String, MeasurementType> getMeasurementTypeMap() {
+ return measurementTypeMap;
+ }
+
+ public void setMeasurementTypeMap(
+ Map<String, MeasurementType> measurementTypeMap) {
+ this.measurementTypeMap = measurementTypeMap;
+ }
+
public GroupByPlan getGroupByPlan() {
return groupByPlan;
}
@@ -113,64 +114,14 @@ public class AlignByDevicePlan extends QueryPlan {
this.setOperatorType(Operator.OperatorType.AGGREGATION);
}
- //we use the following algorithm to reproduce the order of measurements that
user writes.
- //suppose user writes SELECT 'c1',a1,b1,b2,'c2',a2,a3,'c3',b3,a4,a5 FROM ...
where for each a_i
- // column there is at least one device having it, and for each b_i column
there is no device
- // having it, and 'c_i' is a const column.
- // Then, measurements is {a1, a2, a3, a4, a5};
- // notExistMeasurements = {b1, b2, b3}, and positionOfNotExistMeasurements
is {2, 3, 8};
- // constMeasurements is {'c1', 'c2', 'c3'}, and positionOfConstMeasurements
is {0, 4, 7}.
- // When to reproduce the order of measurements. The pseudocode is:
- //<pre>
- // current = 0;
- // if (min(notExist, const) <= current) {
- // pull min_element(notExist, const);
- // } else {
- // pull from measurements;
- // }
- // current ++;
- //</pre>
-
- public void addNotExistMeasurement(int position, String measurement) {
- notExistMeasurements.add(measurement);
- positionOfNotExistMeasurements.add(position);
- }
-
- public void addConstMeasurement(int position, String measurement) {
- constMeasurements.add(measurement);
- positionOfConstMeasurements.add(position);
- }
-
- public List<String> getNotExistMeasurements() {
- return notExistMeasurements;
- }
-
- public void setNotExistMeasurements(List<String> notExistMeasurements) {
- this.notExistMeasurements = notExistMeasurements;
- }
-
- public List<Integer> getPositionOfNotExistMeasurements() {
- return positionOfNotExistMeasurements;
- }
-
- public void setPositionOfNotExistMeasurements(
- List<Integer> positionOfNotExistMeasurements) {
- this.positionOfNotExistMeasurements = positionOfNotExistMeasurements;
- }
-
- public List<String> getConstMeasurements() {
- return constMeasurements;
- }
-
- public void setConstMeasurements(List<String> constMeasurements) {
- this.constMeasurements = constMeasurements;
- }
-
- public List<Integer> getPositionOfConstMeasurements() {
- return positionOfConstMeasurements;
- }
-
- public void setPositionOfConstMeasurements(List<Integer>
positionOfConstMeasurements) {
- this.positionOfConstMeasurements = positionOfConstMeasurements;
+ /**
+ * Exist: the measurements which don't belong to NonExist and Constant.
+ * NonExist: the measurements that do not exist in any device, data type is
considered as String.
+ * The value is considered as null.
+ * Constant: the measurements that have quotation mark. e.g. "abc",'11'.
+ * The data type is considered as String and the value is the measurement
name.
+ */
+ public enum MeasurementType {
+ Exist, NonExist, Constant;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index 1f2413e..56f9b80 100644
---
a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++
b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -59,13 +59,14 @@ import
org.apache.iotdb.db.qp.logical.sys.ShowTimeSeriesOperator;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
+import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
-import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
import org.apache.iotdb.db.qp.physical.sys.CountPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
@@ -267,25 +268,24 @@ public class PhysicalGenerator {
List<Path> suffixPaths =
queryOperator.getSelectOperator().getSuffixPaths();
List<String> originAggregations =
queryOperator.getSelectOperator().getAggregations();
+ // to record result measurement columns
List<String> measurements = new ArrayList<>();
Map<String, Set<String>> deviceToMeasurementsMap = new LinkedHashMap<>();
// to check the same measurement of different devices having the same
datatype
- Map<String, TSDataType> dataTypeConsistencyChecker = new HashMap<>();
+ Map<String, TSDataType> measurementDataTypeMap = new HashMap<>();
+ Map<String, MeasurementType> measurementTypeMap = new HashMap<>();
List<Path> paths = new ArrayList<>();
- // current location for measurements in SELECT
- int loc = 0;
-
for (int i = 0; i < suffixPaths.size(); i++) { // per suffix in SELECT
Path suffixPath = suffixPaths.get(i);
- Set<String> nonExistMeasurement = new HashSet<>();
// to record measurements in the loop of a suffix path
Set<String> measurementSetOfGivenSuffix = new LinkedHashSet<>();
// if const measurement
if (suffixPath.startWith("'") || suffixPath.startWith("\"")) {
- alignByDevicePlan.addConstMeasurement(loc++,
suffixPath.getMeasurement());
+ measurements.add(suffixPath.getMeasurement());
+ measurementTypeMap.put(suffixPath.getMeasurement(),
MeasurementType.Constant);
continue;
}
@@ -297,7 +297,11 @@ public class PhysicalGenerator {
// for actual non exist path
if (actualPaths.isEmpty() && originAggregations.isEmpty()) {
- nonExistMeasurement.add(fullPath.getMeasurement());
+ String nonExistMeasurement = fullPath.getMeasurement();
+ if (measurementSetOfGivenSuffix.add(nonExistMeasurement)
+ && measurementTypeMap.get(nonExistMeasurement) !=
MeasurementType.Exist) {
+ measurementTypeMap.put(fullPath.getMeasurement(),
MeasurementType.NonExist);
+ }
}
for (String pathStr : actualPaths) {
@@ -316,20 +320,21 @@ public class PhysicalGenerator {
measurementChecked = path.getMeasurement();
}
TSDataType dataType = getSeriesType(pathForDataType);
- if (dataTypeConsistencyChecker.containsKey(measurementChecked)) {
- if
(!dataType.equals(dataTypeConsistencyChecker.get(measurementChecked))) {
+ if (measurementDataTypeMap.containsKey(measurementChecked)) {
+ if
(!dataType.equals(measurementDataTypeMap.get(measurementChecked))) {
throw new QueryProcessException(
"The data types of the same measurement column should be
the same across "
+ "devices in ALIGN_BY_DEVICE sql. For more details
please refer to the "
+ "SQL document.");
}
} else {
- dataTypeConsistencyChecker.put(measurementChecked, dataType);
+ measurementDataTypeMap.put(measurementChecked, dataType);
}
- // update measurementSetOfGivenSuffix and measurement location
- if (measurementSetOfGivenSuffix.add(measurementChecked)) {
- loc++;
+ // update measurementSetOfGivenSuffix and Normal measurement
+ if (measurementSetOfGivenSuffix.add(measurementChecked)
+ || measurementTypeMap.get(measurementChecked) !=
MeasurementType.Exist) {
+ measurementTypeMap.put(measurementChecked,
MeasurementType.Exist);
}
// update deviceToMeasurementsMap
if (!deviceToMeasurementsMap.containsKey(device)) {
@@ -347,11 +352,6 @@ public class PhysicalGenerator {
}
}
- nonExistMeasurement.removeAll(measurementSetOfGivenSuffix);
- // update notExistMeasurement
- for (String notExistMeasurementString : nonExistMeasurement) {
- alignByDevicePlan.addNotExistMeasurement(loc++,
notExistMeasurementString);
- }
// update measurements
// Note that in the loop of a suffix path, set is used.
// And across the loops of suffix paths, list is used.
@@ -372,7 +372,8 @@ public class PhysicalGenerator {
// assigns to alignByDevicePlan
alignByDevicePlan.setMeasurements(measurements);
alignByDevicePlan.setDeviceToMeasurementsMap(deviceToMeasurementsMap);
-
alignByDevicePlan.setDataTypeConsistencyChecker(dataTypeConsistencyChecker);
+ alignByDevicePlan.setMeasurementDataTypeMap(measurementDataTypeMap);
+ alignByDevicePlan.setMeasurementTypeMap(measurementTypeMap);
alignByDevicePlan.setPaths(paths);
// get deviceToFilterMap
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
index 7974bfb..a2c3897 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.query.dataset;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -28,6 +29,7 @@ import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
+import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType;
import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
@@ -53,18 +55,11 @@ public class AlignByDeviceDataSet extends QueryDataSet {
private QueryContext context;
private IExpression expression;
- private List<String> deduplicatedMeasurementColumns;
+ private List<String> measurements;
private Map<String, Set<String>> deviceToMeasurementsMap;
private Map<String, IExpression> deviceToFilterMap;
+ private Map<String, MeasurementType> measurementTypeMap;
- // the measurements that do not exist in any device,
- // data type is considered as Boolean. The value is considered as null
- private List<String> notExistMeasurements;
- private List<Integer> positionOfNotExistMeasurements;
- // the measurements that have quotation mark. e.g. "abc",
- // '11', the data type is considered as String and the value is considered
is the same with measurement name
- private List<String> constMeasurements;
- private List<Integer> positionOfConstMeasurements;
private GroupByPlan groupByPlan;
private FillQueryPlan fillQueryPlan;
@@ -75,24 +70,20 @@ public class AlignByDeviceDataSet extends QueryDataSet {
private Iterator<String> deviceIterator;
private String currentDevice;
private QueryDataSet currentDataSet;
- private int[] currentColumnMapRelation;
private Map<Path, TSDataType> tsDataTypeMap;
+ private List<String> executeColumns;
public AlignByDeviceDataSet(AlignByDevicePlan alignByDevicePlan,
QueryContext context,
IQueryRouter queryRouter) {
super(null, alignByDevicePlan.getDataTypes());
- // get deduplicated measurement columns (already deduplicated in
TSServiceImpl.getAlignByDeviceQueryHeaders)
- this.deduplicatedMeasurementColumns = alignByDevicePlan.getMeasurements();
+ this.measurements = alignByDevicePlan.getMeasurements();
this.tsDataTypeMap = alignByDevicePlan.getDataTypeMapping();
this.queryRouter = queryRouter;
this.context = context;
this.deviceToMeasurementsMap =
alignByDevicePlan.getDeviceToMeasurementsMap();
this.deviceToFilterMap = alignByDevicePlan.getDeviceToFilterMap();
- this.notExistMeasurements = alignByDevicePlan.getNotExistMeasurements();
- this.constMeasurements = alignByDevicePlan.getConstMeasurements();
- this.positionOfNotExistMeasurements =
alignByDevicePlan.getPositionOfNotExistMeasurements();
- this.positionOfConstMeasurements =
alignByDevicePlan.getPositionOfConstMeasurements();
+ this.measurementTypeMap = alignByDevicePlan.getMeasurementTypeMap();
switch (alignByDevicePlan.getOperatorType()) {
case GROUPBY:
@@ -114,7 +105,6 @@ public class AlignByDeviceDataSet extends QueryDataSet {
this.curDataSetInitialized = false;
this.deviceIterator = deviceToMeasurementsMap.keySet().iterator();
- this.currentColumnMapRelation = new
int[deduplicatedMeasurementColumns.size()];
}
protected boolean hasNextWithoutConstraint() throws IOException {
@@ -125,30 +115,11 @@ public class AlignByDeviceDataSet extends QueryDataSet {
}
while (deviceIterator.hasNext()) {
- for (int i = 0; i < deduplicatedMeasurementColumns.size(); i++) {
- currentColumnMapRelation[i] = -1;
- }
currentDevice = deviceIterator.next();
Set<String> measurementColumnsOfGivenDevice = deviceToMeasurementsMap
.get(currentDevice);
+ executeColumns = new ArrayList<>(measurementColumnsOfGivenDevice);
- // get columns to execute for the current device and the column map
relation
- // e.g. root.sg.d0's measurementColumnsOfGivenDevice is {s2,s3}, and
- // deduplicatedMeasurementColumns is {s1,s2,s3,s4,s5},
- // then the final executeColumns is [s2,s3], currentColumnMapRelation is
[-1,0,1,-1,-1].
- List<String> executeColumns = new ArrayList<>();
- int indexInExecuteColumns = -1;
- for (String column : measurementColumnsOfGivenDevice) {
- for (int i = 0; i < deduplicatedMeasurementColumns.size(); i++) {
- String columnToExecute = deduplicatedMeasurementColumns.get(i);
- if (columnToExecute.equals(column)) {
- executeColumns.add(column);
- indexInExecuteColumns++;
- currentColumnMapRelation[i] = indexInExecuteColumns;
- break;
- }
- }
- }
// extract paths and aggregations if exist from executeColumns
List<Path> executePaths = new ArrayList<>();
List<TSDataType> tsDataTypes = new ArrayList<>();
@@ -223,43 +194,32 @@ public class AlignByDeviceDataSet extends QueryDataSet {
rowRecord.addField(deviceField);
List<Field> measurementFields = originRowRecord.getFields();
- for (int mapPos : currentColumnMapRelation) {
- if (mapPos == -1) {
- rowRecord.addField(null);
- } else {
- rowRecord.addField(measurementFields.get(mapPos));
- }
+ Map<String, Field> currentColumnMap = new HashMap<>();
+ for (int i = 0; i < measurementFields.size(); i++) {
+ currentColumnMap.put(executeColumns.get(i), measurementFields.get(i));
}
- // build record with constant and non exist measurement
- RowRecord outRecord = new RowRecord(originRowRecord.getTimestamp());
- int loc = 0;
- int totalSize = notExistMeasurements.size() + constMeasurements.size()
- + rowRecord.getFields().size();
- int notExistMeasurementsLoc = 0;
- int constMeasurementsLoc = 0;
- int resLoc = 0;
- // don't forget device column, so loc - 1 is for looking up constant and
non exist column
- while (loc < totalSize) {
- if (notExistMeasurementsLoc < notExistMeasurements.size()
- && loc - 1 ==
positionOfNotExistMeasurements.get(notExistMeasurementsLoc)) {
- outRecord.addField(null);
- notExistMeasurementsLoc++;
- } else if (constMeasurementsLoc < constMeasurements.size()
- && loc - 1 == positionOfConstMeasurements.get(constMeasurementsLoc))
{
- Field res = new Field(TSDataType.TEXT);
-
res.setBinaryV(Binary.valueOf(constMeasurements.get(constMeasurementsLoc)));
- outRecord.addField(res);
- constMeasurementsLoc++;
- } else {
- outRecord.addField(rowRecord.getFields().get(resLoc));
- resLoc++;
+ for (String measurement : measurements) {
+ switch (measurementTypeMap.get(measurement)) {
+ case Exist:
+ if (currentColumnMap.get(measurement) != null) {
+ rowRecord.addField(currentColumnMap.get(measurement));
+ } else {
+ rowRecord.addField(new Field(null));
+ }
+ break;
+ case NonExist:
+ rowRecord.addField(new Field(null));
+ break;
+ case Constant:
+ Field res = new Field(TSDataType.TEXT);
+ res.setBinaryV(Binary.valueOf(measurement));
+ rowRecord.addField(res);
+ break;
}
-
- loc++;
}
- return outRecord;
+ return rowRecord;
}
private enum DataSetType {
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 071181b..c7f892b 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -18,6 +18,9 @@
*/
package org.apache.iotdb.db.service;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_VALUE;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.sql.SQLException;
@@ -28,6 +31,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -60,6 +64,7 @@ import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
+import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType;
import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
@@ -192,7 +197,8 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
//check the version compatibility
boolean compatible = checkCompatibility(req.getClient_protocol());
if (!compatible) {
- tsStatus = getStatus(TSStatusCode.INCOMPATIBLE_VERSION, "The version
is incompatible, please upgrade to " + IoTDBConstant.VERSION);
+ tsStatus = getStatus(TSStatusCode.INCOMPATIBLE_VERSION,
+ "The version is incompatible, please upgrade to " +
IoTDBConstant.VERSION);
TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus,
TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V2);
resp.setSessionId(sessionId);
@@ -208,7 +214,7 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
tsStatus = getStatus(TSStatusCode.WRONG_LOGIN_PASSWORD_ERROR);
}
TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus,
- TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V2);
+ TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V2);
resp.setSessionId(sessionId);
logger.info(
"{}: Login status: {}. User : {}",
@@ -320,7 +326,7 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
/**
* convert from TSStatusCode to TSStatus, which has message appending with
existed status message
*
- * @param statusType status type
+ * @param statusType status type
* @param appendMessage appending message
*/
private TSStatus getStatus(TSStatusCode statusType, String appendMessage) {
@@ -574,7 +580,7 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
/**
* @param plan must be a plan for Query: FillQueryPlan, AggregationPlan,
GroupByPlan, some
- * AuthorPlan
+ * AuthorPlan
*/
private TSExecuteStatementResp internalExecuteQueryStatement(
long statementId, PhysicalPlan plan, int fetchSize, String username) {
@@ -798,113 +804,54 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
private void getAlignByDeviceQueryHeaders(
AlignByDevicePlan plan, List<String> respColumns, List<String>
columnTypes) {
- // set columns in TSExecuteStatementResp. Note this is without
deduplication.
- respColumns.add(SQLConstant.GROUPBY_DEVICE_COLUMN_NAME);
+ // set columns in TSExecuteStatementResp.
+ respColumns.add(SQLConstant.ALIGNBY_DEVICE_COLUMN_NAME);
// get column types and do deduplication
columnTypes.add(TSDataType.TEXT.toString()); // the DEVICE column of
ALIGN_BY_DEVICE result
List<TSDataType> deduplicatedColumnsType = new ArrayList<>();
deduplicatedColumnsType.add(TSDataType.TEXT); // the DEVICE column of
ALIGN_BY_DEVICE result
- List<String> deduplicatedMeasurementColumns = new ArrayList<>();
- Set<String> tmpColumnSet = new HashSet<>();
- Map<String, TSDataType> checker = plan.getDataTypeConsistencyChecker();
- // build column header with constant and non exist column and deduplicate
- int loc = 0;
- // size of total column
- int totalSize = plan.getNotExistMeasurements().size() +
plan.getConstMeasurements().size()
- + plan.getMeasurements().size();
- // not exist column loc
- int notExistMeasurementsLoc = 0;
- // constant column loc
- int constMeasurementsLoc = 0;
- // normal column loc
- int resLoc = 0;
- // after removing duplicate, we must shift column position
- int shiftLoc = 0;
- while (loc < totalSize) {
- boolean isNonExist = false;
- boolean isConstant = false;
- TSDataType type;
- String column;
- // not exist
- if (isOneMeasurementIn(loc,
- notExistMeasurementsLoc, plan.getPositionOfNotExistMeasurements())) {
- // for shifting
- plan.getPositionOfNotExistMeasurements().set(notExistMeasurementsLoc,
loc - shiftLoc);
-
- type = TSDataType.TEXT;
- column = plan.getNotExistMeasurements().get(notExistMeasurementsLoc);
- notExistMeasurementsLoc++;
- isNonExist = true;
- }
- // constant
- else if (isOneMeasurementIn(loc,
- constMeasurementsLoc, plan.getPositionOfConstMeasurements())) {
- // for shifting
- plan.getPositionOfConstMeasurements().set(constMeasurementsLoc, loc -
shiftLoc);
-
- type = TSDataType.TEXT;
- column = plan.getConstMeasurements().get(constMeasurementsLoc);
- constMeasurementsLoc++;
- isConstant = true;
- }
- // normal series
- else {
- type = checker.get(plan.getMeasurements().get(resLoc));
- column = plan.getMeasurements().get(resLoc);
- resLoc++;
+ Set<String> deduplicatedMeasurements = new LinkedHashSet<>();
+ Map<String, TSDataType> checker = plan.getMeasurementDataTypeMap();
+
+ // build column header with constant and non exist column and deduplication
+ List<String> measurements = plan.getMeasurements();
+ Map<String, MeasurementType> measurementTypeMap =
plan.getMeasurementTypeMap();
+ for (String measurement : measurements) {
+ TSDataType type = null;
+ switch (measurementTypeMap.get(measurement)) {
+ case Exist:
+ type = checker.get(measurement);
+ break;
+ case NonExist:
+ case Constant:
+ type = TSDataType.TEXT;
}
-
+ respColumns.add(measurement);
columnTypes.add(type.toString());
- respColumns.add(column);
- // deduplicate part
- if (!tmpColumnSet.contains(column)) {
- // Note that this deduplication strategy is consistent with that of
client
- // IoTDBQueryResultSet.
- tmpColumnSet.add(column);
- if (!isNonExist && !isConstant) {
- // only refer to those normal measurements
- deduplicatedMeasurementColumns.add(column);
- }
+
+ if (!deduplicatedMeasurements.contains(measurement)) {
+ deduplicatedMeasurements.add(measurement);
deduplicatedColumnsType.add(type);
- } else if (isConstant) {
- shiftLoc++;
- constMeasurementsLoc--;
- plan.getConstMeasurements().remove(constMeasurementsLoc);
- plan.getPositionOfConstMeasurements().remove(constMeasurementsLoc);
- } else if (isNonExist) {
- shiftLoc++;
- notExistMeasurementsLoc--;
- plan.getNotExistMeasurements().remove(notExistMeasurementsLoc);
-
plan.getPositionOfNotExistMeasurements().remove(notExistMeasurementsLoc);
- } else {
- shiftLoc++;
}
-
- loc++;
}
// save deduplicated measurementColumn names and types in QueryPlan for
the next stage to use.
- // i.e., used by DeviceIterateDataSet constructor in `fetchResults` stage.
- plan.setMeasurements(deduplicatedMeasurementColumns);
+ // i.e., used by AlignByDeviceDataSet constructor in `fetchResults` stage.
+ plan.setMeasurements(new ArrayList<>(deduplicatedMeasurements));
plan.setDataTypes(deduplicatedColumnsType);
// set these null since they are never used henceforth in ALIGN_BY_DEVICE
query processing.
plan.setPaths(null);
- plan.setDataTypeConsistencyChecker(null);
}
- /**
- *
- * @param subLoc
- * @param totalLoc
- * @param measurementPositions
- * @return true if the measurement at totalLoc is the subLoc measurement in
measurementPositions,
- * false otherwise
- */
- private boolean isOneMeasurementIn(int totalLoc,
- int subLoc, List<Integer> measurementPositions) {
- return subLoc < measurementPositions.size() && totalLoc ==
measurementPositions.get(subLoc);
+ private void getLastQueryHeaders(
+ QueryPlan plan, List<String> respColumns, List<String> columnTypes)
+ throws TException, QueryProcessException {
+ respColumns.add(COLUMN_TIMESERIES);
+ respColumns.add(COLUMN_VALUE);
+ columnTypes.add(TSDataType.TEXT.toString());
+ columnTypes.add(TSDataType.TEXT.toString());
}
@Override