This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 20804df90e [IOTDB-3334] Fix mismatched data columns and headers bug in
AlignByDevice query (#6104)
20804df90e is described below
commit 20804df90ebfdaeb2997c09acf12184ef34f1983
Author: liuminghui233 <[email protected]>
AuthorDate: Wed Jun 1 12:31:30 2022 +0800
[IOTDB-3334] Fix mismatched data columns and headers bug in AlignByDevice
query (#6104)
---
.../apache/iotdb/db/metadata/utils/MetaUtils.java | 22 +++
.../apache/iotdb/db/mpp/plan/analyze/Analyzer.java | 8 +-
.../db/mpp/plan/planner/LogicalPlanBuilder.java | 177 +++++++++++++++++----
.../iotdb/db/mpp/plan/planner/LogicalPlanner.java | 62 ++++++--
.../db/mpp/plan/plan/QueryLogicalPlanUtil.java | 14 +-
5 files changed, 226 insertions(+), 57 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java
b/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java
index 96189b8551..1ca7540395 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java
@@ -91,6 +91,28 @@ public class MetaUtils {
return result;
}
+ public static List<PartialPath> groupAlignedSeries(List<PartialPath>
fullPaths) {
+ List<PartialPath> result = new ArrayList<>();
+ Map<String, AlignedPath> deviceToAlignedPathMap = new HashMap<>();
+ for (PartialPath path : fullPaths) {
+ MeasurementPath measurementPath = (MeasurementPath) path;
+ if (!measurementPath.isUnderAlignedEntity()) {
+ result.add(measurementPath);
+ } else {
+ String deviceName = measurementPath.getDevice();
+ if (!deviceToAlignedPathMap.containsKey(deviceName)) {
+ AlignedPath alignedPath = new AlignedPath(measurementPath);
+ deviceToAlignedPathMap.put(deviceName, alignedPath);
+ } else {
+ AlignedPath alignedPath = deviceToAlignedPathMap.get(deviceName);
+ alignedPath.addMeasurement(measurementPath);
+ }
+ }
+ }
+ result.addAll(deviceToAlignedPathMap.values());
+ return result;
+ }
+
@TestOnly
public static List<String> getMultiFullPaths(IMNode node) {
if (node == null) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
index cb63f3e607..8418241072 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
@@ -229,10 +229,10 @@ public class Analyzer {
.distinct()
.collect(Collectors.toList());
for (String deviceName : deviceToMeasurementsMap.keySet()) {
- List<String> measurementsUnderDeivce =
+ List<String> measurementsUnderDevice =
new ArrayList<>(deviceToMeasurementsMap.get(deviceName));
List<Integer> indexes = new ArrayList<>();
- for (String measurement : measurementsUnderDeivce) {
+ for (String measurement : measurementsUnderDevice) {
indexes.add(
allMeasurements.indexOf(measurement) + 1); // add 1 to skip
the device column
}
@@ -249,8 +249,8 @@ public class Analyzer {
Map<String, Set<Expression>> deviceToAggregationTransformExpressions
= new HashMap<>();
for (String deviceName : deviceToTransformExpressions.keySet()) {
Set<Expression> transformExpressions =
deviceToTransformExpressions.get(deviceName);
- Set<Expression> aggregationExpressions = new HashSet<>();
- Set<Expression> aggregationTransformExpressions = new HashSet<>();
+ Set<Expression> aggregationExpressions = new LinkedHashSet<>();
+ Set<Expression> aggregationTransformExpressions = new
LinkedHashSet<>();
boolean isHasRawDataInputAggregation = false;
if (queryStatement.isAggregationQuery()) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index d789e58892..ecb7b4e73c 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -63,6 +63,7 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggreg
import
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
import
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
+import
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSourceNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
@@ -87,6 +88,7 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import static com.google.common.base.Preconditions.checkArgument;
import static
org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
public class LogicalPlanBuilder {
@@ -115,7 +117,7 @@ public class LogicalPlanBuilder {
sourceExpressions.stream()
.map(expression -> ((TimeSeriesOperand) expression).getPath())
.collect(Collectors.toList());
- List<PartialPath> groupedPaths =
MetaUtils.groupAlignedPaths(selectedPaths);
+ List<PartialPath> groupedPaths =
MetaUtils.groupAlignedSeries(selectedPaths);
for (PartialPath path : groupedPaths) {
if (path instanceof MeasurementPath) { // non-aligned series
SeriesScanNode seriesScanNode =
@@ -160,46 +162,147 @@ public class LogicalPlanBuilder {
public LogicalPlanBuilder planAggregationSource(
Set<Expression> sourceExpressions,
+ AggregationStep curStep,
OrderBy scanOrder,
Filter timeFilter,
GroupByTimeParameter groupByTimeParameter,
Set<Expression> aggregationExpressions,
Map<Expression, Set<Expression>> groupByLevelExpressions,
TypeProvider typeProvider) {
- AggregationStep curStep =
- (groupByLevelExpressions != null
- || (groupByTimeParameter != null &&
groupByTimeParameter.hasOverlap()))
- ? AggregationStep.PARTIAL
- : AggregationStep.SINGLE;
- List<PlanNode> sourceNodeList = new ArrayList<>();
boolean needCheckAscending = groupByTimeParameter == null;
Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations = new
HashMap<>();
Map<PartialPath, List<AggregationDescriptor>> descendingAggregations = new
HashMap<>();
for (Expression sourceExpression : sourceExpressions) {
- AggregationType aggregationFunction =
- AggregationType.valueOf(
- ((FunctionExpression)
sourceExpression).getFunctionName().toUpperCase());
+ createAggregationDescriptor(
+ (FunctionExpression) sourceExpression,
+ curStep,
+ scanOrder,
+ needCheckAscending,
+ typeProvider,
+ ascendingAggregations,
+ descendingAggregations);
+ }
+
+ List<PlanNode> sourceNodeList =
+ constructSourceNodeFromAggregationDescriptors(
+ ascendingAggregations,
+ descendingAggregations,
+ scanOrder,
+ timeFilter,
+ groupByTimeParameter);
+
+ return convergeAggregationSource(
+ sourceNodeList,
+ curStep,
+ scanOrder,
+ groupByTimeParameter,
+ aggregationExpressions,
+ groupByLevelExpressions);
+ }
+
+ public LogicalPlanBuilder planAggregationSourceWithIndexAdjust(
+ Set<Expression> sourceExpressions,
+ AggregationStep curStep,
+ OrderBy scanOrder,
+ Filter timeFilter,
+ GroupByTimeParameter groupByTimeParameter,
+ Set<Expression> aggregationExpressions,
+ List<Integer> measurementIndexes,
+ Map<Expression, Set<Expression>> groupByLevelExpressions,
+ TypeProvider typeProvider) {
+ checkArgument(
+ sourceExpressions.size() == measurementIndexes.size(),
+ "Each aggregate should correspond to a column of output.");
+
+ boolean needCheckAscending = groupByTimeParameter == null;
+ Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations = new
HashMap<>();
+ Map<PartialPath, List<AggregationDescriptor>> descendingAggregations = new
HashMap<>();
+ Map<AggregationDescriptor, Integer> aggregationToMeasurementIndexMap = new
HashMap<>();
+
+ int index = 0;
+ for (Expression sourceExpression : sourceExpressions) {
AggregationDescriptor aggregationDescriptor =
- new AggregationDescriptor(
- aggregationFunction, curStep, sourceExpression.getExpressions());
- if (curStep.isOutputPartial()) {
- updateTypeProviderByPartialAggregation(aggregationDescriptor,
typeProvider);
- }
- PartialPath selectPath =
- ((TimeSeriesOperand)
sourceExpression.getExpressions().get(0)).getPath();
- if (!needCheckAscending
- || SchemaUtils.isConsistentWithScanOrder(aggregationFunction,
scanOrder)) {
- ascendingAggregations
- .computeIfAbsent(selectPath, key -> new ArrayList<>())
- .add(aggregationDescriptor);
- } else {
- descendingAggregations
- .computeIfAbsent(selectPath, key -> new ArrayList<>())
- .add(aggregationDescriptor);
- }
+ createAggregationDescriptor(
+ (FunctionExpression) sourceExpression,
+ curStep,
+ scanOrder,
+ needCheckAscending,
+ typeProvider,
+ ascendingAggregations,
+ descendingAggregations);
+ aggregationToMeasurementIndexMap.put(aggregationDescriptor,
measurementIndexes.get(index));
+ index++;
+ }
+
+ List<PlanNode> sourceNodeList =
+ constructSourceNodeFromAggregationDescriptors(
+ ascendingAggregations,
+ descendingAggregations,
+ scanOrder,
+ timeFilter,
+ groupByTimeParameter);
+
+ if (!curStep.isOutputPartial()) {
+ // update measurementIndexes
+ measurementIndexes.clear();
+ measurementIndexes.addAll(
+ sourceNodeList.stream()
+ .map(
+ planNode ->
+ ((SeriesAggregationSourceNode)
planNode).getAggregationDescriptorList())
+ .flatMap(List::stream)
+ .map(aggregationToMeasurementIndexMap::get)
+ .collect(Collectors.toList()));
+ }
+
+ return convergeAggregationSource(
+ sourceNodeList,
+ curStep,
+ scanOrder,
+ groupByTimeParameter,
+ aggregationExpressions,
+ groupByLevelExpressions);
+ }
+
+ private AggregationDescriptor createAggregationDescriptor(
+ FunctionExpression sourceExpression,
+ AggregationStep curStep,
+ OrderBy scanOrder,
+ boolean needCheckAscending,
+ TypeProvider typeProvider,
+ Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations,
+ Map<PartialPath, List<AggregationDescriptor>> descendingAggregations) {
+ AggregationType aggregationFunction =
+
AggregationType.valueOf(sourceExpression.getFunctionName().toUpperCase());
+ AggregationDescriptor aggregationDescriptor =
+ new AggregationDescriptor(aggregationFunction, curStep,
sourceExpression.getExpressions());
+ if (curStep.isOutputPartial()) {
+ updateTypeProviderByPartialAggregation(aggregationDescriptor,
typeProvider);
+ }
+ PartialPath selectPath =
+ ((TimeSeriesOperand)
sourceExpression.getExpressions().get(0)).getPath();
+ if (!needCheckAscending
+ || SchemaUtils.isConsistentWithScanOrder(aggregationFunction,
scanOrder)) {
+ ascendingAggregations
+ .computeIfAbsent(selectPath, key -> new ArrayList<>())
+ .add(aggregationDescriptor);
+ } else {
+ descendingAggregations
+ .computeIfAbsent(selectPath, key -> new ArrayList<>())
+ .add(aggregationDescriptor);
}
+ return aggregationDescriptor;
+ }
+ private List<PlanNode> constructSourceNodeFromAggregationDescriptors(
+ Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations,
+ Map<PartialPath, List<AggregationDescriptor>> descendingAggregations,
+ OrderBy scanOrder,
+ Filter timeFilter,
+ GroupByTimeParameter groupByTimeParameter) {
+ List<PlanNode> sourceNodeList = new ArrayList<>();
+ boolean needCheckAscending = groupByTimeParameter == null;
Map<PartialPath, List<AggregationDescriptor>> groupedAscendingAggregations
=
MetaUtils.groupAlignedAggregations(ascendingAggregations);
for (Map.Entry<PartialPath, List<AggregationDescriptor>>
pathAggregationsEntry :
@@ -223,11 +326,20 @@ public class LogicalPlanBuilder {
pathAggregationsEntry.getKey(),
pathAggregationsEntry.getValue(),
scanOrder,
- groupByTimeParameter,
+ null,
timeFilter));
}
}
+ return sourceNodeList;
+ }
+ private LogicalPlanBuilder convergeAggregationSource(
+ List<PlanNode> sourceNodeList,
+ AggregationStep curStep,
+ OrderBy scanOrder,
+ GroupByTimeParameter groupByTimeParameter,
+ Set<Expression> aggregationExpressions,
+ Map<Expression, Set<Expression>> groupByLevelExpressions) {
if (curStep.isOutputPartial()) {
if (groupByTimeParameter != null && groupByTimeParameter.hasOverlap()) {
curStep =
@@ -264,6 +376,7 @@ public class LogicalPlanBuilder {
} else {
this.root = convergeWithTimeJoin(sourceNodeList, scanOrder);
}
+
return this;
}
@@ -419,7 +532,7 @@ public class LogicalPlanBuilder {
scanOrder);
}
- private PlanNode createAggregationScanNode(
+ private SeriesAggregationSourceNode createAggregationScanNode(
PartialPath selectPath,
List<AggregationDescriptor> aggregationDescriptorList,
OrderBy scanOrder,
@@ -486,9 +599,9 @@ public class LogicalPlanBuilder {
}
public LogicalPlanBuilder planTransform(
- Set<Expression> selectExpressions, boolean isGroupByTime, ZoneId zoneId)
{
+ Set<Expression> transformExpressions, boolean isGroupByTime, ZoneId
zoneId) {
boolean needTransform = false;
- for (Expression expression : selectExpressions) {
+ for (Expression expression : transformExpressions) {
if (ExpressionAnalyzer.checkIsNeedTransform(expression)) {
needTransform = true;
break;
@@ -502,7 +615,7 @@ public class LogicalPlanBuilder {
new TransformNode(
context.getQueryId().genPlanNodeId(),
this.getRoot(),
- selectExpressions.toArray(new Expression[0]),
+ transformExpressions.toArray(new Expression[0]),
isGroupByTime,
zoneId);
return this;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
index b6fa90d6c2..f29a6b17bd 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.plan.planner;
import org.apache.iotdb.db.metadata.utils.TimeseriesVersionUtil;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
+import org.apache.iotdb.db.mpp.plan.analyze.ExpressionAnalyzer;
import org.apache.iotdb.db.mpp.plan.expression.Expression;
import org.apache.iotdb.db.mpp.plan.optimization.PlanOptimizer;
import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
@@ -141,6 +142,7 @@ public class LogicalPlanner {
analysis.getDeviceToQueryFilter() != null
? analysis.getDeviceToQueryFilter().get(deviceName)
: null,
+
analysis.getDeviceToMeasurementIndexesMap().get(deviceName),
context));
deviceToSubPlanMap.put(deviceName, subPlanBuilder.getRoot());
}
@@ -164,6 +166,7 @@ public class LogicalPlanner {
analysis.getAggregationTransformExpressions(),
analysis.getTransformExpressions(),
analysis.getQueryFilter(),
+ null,
context));
}
@@ -186,6 +189,7 @@ public class LogicalPlanner {
Set<Expression> aggregationTransformExpressions,
Set<Expression> transformExpressions,
Expression queryFilter,
+ List<Integer> measurementIndexes, // only used in ALIGN BY DEVICE
MPPQueryContext context) {
LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context);
@@ -272,20 +276,50 @@ public class LogicalPlanner {
}
}
} else {
- planBuilder =
- planBuilder
- .planAggregationSource(
- sourceExpressions,
- queryStatement.getResultOrder(),
- analysis.getGlobalTimeFilter(),
- analysis.getGroupByTimeParameter(),
- aggregationExpressions,
- analysis.getGroupByLevelExpressions(),
- analysis.getTypeProvider())
- .planTransform(
- transformExpressions,
- queryStatement.isGroupByTime(),
- queryStatement.getSelectComponent().getZoneId());
+ AggregationStep curStep =
+ (analysis.getGroupByLevelExpressions() != null
+ || (analysis.getGroupByTimeParameter() != null
+ && analysis.getGroupByTimeParameter().hasOverlap()))
+ ? AggregationStep.PARTIAL
+ : AggregationStep.SINGLE;
+
+ boolean needTransform = false;
+ for (Expression expression : transformExpressions) {
+ if (ExpressionAnalyzer.checkIsNeedTransform(expression)) {
+ needTransform = true;
+ break;
+ }
+ }
+
+ if (!needTransform && measurementIndexes != null) {
+ planBuilder =
+ planBuilder.planAggregationSourceWithIndexAdjust(
+ sourceExpressions,
+ curStep,
+ queryStatement.getResultOrder(),
+ analysis.getGlobalTimeFilter(),
+ analysis.getGroupByTimeParameter(),
+ aggregationExpressions,
+ measurementIndexes,
+ analysis.getGroupByLevelExpressions(),
+ analysis.getTypeProvider());
+ } else {
+ planBuilder =
+ planBuilder
+ .planAggregationSource(
+ sourceExpressions,
+ curStep,
+ queryStatement.getResultOrder(),
+ analysis.getGlobalTimeFilter(),
+ analysis.getGroupByTimeParameter(),
+ aggregationExpressions,
+ analysis.getGroupByLevelExpressions(),
+ analysis.getTypeProvider())
+ .planTransform(
+ transformExpressions,
+ queryStatement.isGroupByTime(),
+ queryStatement.getSelectComponent().getZoneId());
+ }
}
return planBuilder.getRoot();
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
index f38534b825..9eda86a8d0 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
@@ -152,11 +152,6 @@ public class QueryLogicalPlanUtil {
QueryId queryId = new QueryId("test");
List<PlanNode> sourceNodeList = new ArrayList<>();
- sourceNodeList.add(
- new AlignedSeriesScanNode(
- queryId.genPlanNodeId(),
- (AlignedPath) schemaMap.get("root.sg.d2.a"),
- OrderBy.TIMESTAMP_ASC));
sourceNodeList.add(
new SeriesScanNode(
queryId.genPlanNodeId(),
@@ -172,6 +167,11 @@ public class QueryLogicalPlanUtil {
queryId.genPlanNodeId(),
(MeasurementPath) schemaMap.get("root.sg.d2.s4"),
OrderBy.TIMESTAMP_ASC));
+ sourceNodeList.add(
+ new AlignedSeriesScanNode(
+ queryId.genPlanNodeId(),
+ (AlignedPath) schemaMap.get("root.sg.d2.a"),
+ OrderBy.TIMESTAMP_ASC));
TimeJoinNode timeJoinNode =
new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC,
sourceNodeList);
OffsetNode offsetNode = new OffsetNode(queryId.genPlanNodeId(),
timeJoinNode, 10);
@@ -716,8 +716,8 @@ public class QueryLogicalPlanUtil {
new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_DESC,
sourceNodeList2);
Map<String, List<Integer>> deviceToMeasurementIndexesMap = new HashMap<>();
- deviceToMeasurementIndexesMap.put("root.sg.d1", Arrays.asList(1, 2, 3));
- deviceToMeasurementIndexesMap.put("root.sg.d2", Arrays.asList(1, 2, 3));
+ deviceToMeasurementIndexesMap.put("root.sg.d1", Arrays.asList(1, 3, 2));
+ deviceToMeasurementIndexesMap.put("root.sg.d2", Arrays.asList(1, 3, 2));
DeviceViewNode deviceViewNode =
new DeviceViewNode(
queryId.genPlanNodeId(),