This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch alignbydevice in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b297a99f8de18712470552e32ddcd568be36bc7a Author: Alima777 <[email protected]> AuthorDate: Fri Nov 19 18:27:24 2021 +0800 recomplete align by device --- .../iotdb/db/qp/logical/crud/QueryOperator.java | 127 +++++++++++---------- .../db/qp/physical/crud/AlignByDevicePlan.java | 86 ++++++++------ .../iotdb/db/qp/physical/crud/MeasurementInfo.java | 37 ++---- .../iotdb/db/qp/physical/crud/QueryPlan.java | 11 +- .../db/query/dataset/AlignByDeviceDataSet.java | 98 ++++------------ .../org/apache/iotdb/db/service/TSServiceImpl.java | 29 ++--- .../iotdb/db/integration/IoTDBAlignByDeviceIT.java | 5 +- 7 files changed, 165 insertions(+), 228 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java index 0d2fb81..c4b5040 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java @@ -30,7 +30,6 @@ import org.apache.iotdb.db.qp.constant.SQLConstant; import org.apache.iotdb.db.qp.logical.Operator; import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan; -import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType; import org.apache.iotdb.db.qp.physical.crud.MeasurementInfo; import org.apache.iotdb.db.qp.physical.crud.QueryIndexPlan; import org.apache.iotdb.db.qp.physical.crud.QueryPlan; @@ -173,8 +172,18 @@ public class QueryOperator extends Operator { } public void check() throws LogicalOperatorException { - if (isAlignByDevice() && selectComponent.hasTimeSeriesGeneratingFunction()) { - throw new LogicalOperatorException("ALIGN BY DEVICE clause is not supported in UDF queries."); + if (isAlignByDevice()) { + if (selectComponent.hasTimeSeriesGeneratingFunction()) { + throw new LogicalOperatorException( + "ALIGN BY DEVICE clause is not supported in UDF queries."); + } + + for (PartialPath path : selectComponent.getPaths()) { + String device = path.getDevice(); + if (!device.isEmpty()) { + throw new LogicalOperatorException(AlignByDevicePlan.MEASUREMENT_ERROR_MESSAGE); + } + } } } @@ -230,74 +239,54 @@ public class QueryOperator extends Operator { throws QueryProcessException { AlignByDevicePlan alignByDevicePlan = new AlignByDevicePlan(); - List<PartialPath> prefixPaths = fromComponent.getPrefixPaths(); // remove stars in fromPaths and get deviceId with deduplication - List<PartialPath> devices = removeStarsInDeviceWithUnique(prefixPaths); - List<ResultColumn> resultColumns = selectComponent.getResultColumns(); + List<PartialPath> devices = removeStarsInDeviceWithUnique(fromComponent.getPrefixPaths()); + List<ResultColumn> resultColumns = + convertSpecialClauseValues(alignByDevicePlan, selectComponent.getResultColumns()); List<String> aggregationFuncs = selectComponent.getAggregationFunctions(); // to record result measurement columns List<String> measurements = new ArrayList<>(); Map<String, MeasurementInfo> measurementInfoMap = new HashMap<>(); List<PartialPath> paths = new ArrayList<>(); + List<String> aggregations = new ArrayList<>(); - for (int i = 0; i < resultColumns.size(); i++) { // per suffix in SELECT + // per suffix in SELECT + for (int i = 0; i < resultColumns.size(); i++) { ResultColumn resultColumn = resultColumns.get(i); - Expression suffixExpression = resultColumn.getExpression(); - PartialPath suffixPath = getSuffixPathFromExpression(suffixExpression); + PartialPath suffixPath = getSuffixPathFromExpression(resultColumn.getExpression()); String aggregation = aggregationFuncs != null ? aggregationFuncs.get(i) : null; - // to record measurements in the loop of a suffix path Set<String> measurementSetOfGivenSuffix = new LinkedHashSet<>(); + + // concat suffix with per device for (PartialPath device : devices) { PartialPath fullPath = device.concatPath(suffixPath); try { // remove stars in SELECT to get actual paths List<MeasurementPath> actualPaths = getMatchedTimeseries(fullPath); - if (suffixPath.getNodes().length > 1) { - throw new QueryProcessException(AlignByDevicePlan.MEASUREMENT_ERROR_MESSAGE); - } if (resultColumn.hasAlias() && actualPaths.size() >= 2) { throw new QueryProcessException( String.format(AlignByDevicePlan.ALIAS_ERROR_MESSAGE, resultColumn.getAlias())); } - if (actualPaths.isEmpty()) { - String nonExistMeasurement = getMeasurementName(fullPath, aggregation); - if (measurementSetOfGivenSuffix.add(nonExistMeasurement)) { - measurementInfoMap.putIfAbsent( - nonExistMeasurement, new MeasurementInfo(MeasurementType.NonExist)); + for (MeasurementPath path : actualPaths) { + MeasurementInfo measurementInfo = + new MeasurementInfo(getMeasurementName(path, aggregation)); + TSDataType columnDataType = path.getSeriesType(); + if (aggregation != null) { + columnDataType = getAggregationType(aggregation); + aggregations.add(aggregation); } - } else { - for (PartialPath path : actualPaths) { - String measurementName = getMeasurementName(path, aggregation); - TSDataType measurementDataType = path.getSeriesType(); - TSDataType columnDataType = getAggregationType(aggregation); - columnDataType = columnDataType == null ? measurementDataType : columnDataType; - MeasurementInfo measurementInfo = - measurementInfoMap.getOrDefault(measurementName, new MeasurementInfo()); - - if (resultColumn.hasAlias()) { - measurementInfo.setMeasurementAlias(resultColumn.getAlias()); - } - - // check datatype consistency - // an inconsistent example: select s0 from root.sg1.d1, root.sg1.d2 align by device - // while root.sg1.d1.s0 is INT32 and root.sg1.d2.s0 is FLOAT. - if (measurementInfo.getColumnDataType() != null) { - if (!columnDataType.equals(measurementInfo.getColumnDataType())) { - throw new QueryProcessException( - "The data types of the same measurement column should be the same across devices."); - } - } else { - measurementInfo.setColumnDataType(columnDataType); - measurementInfo.setMeasurementDataType(measurementDataType); - } - - measurementSetOfGivenSuffix.add(measurementName); - measurementInfo.setMeasurementType(MeasurementType.Exist); - measurementInfoMap.put(measurementName, measurementInfo); - // update paths - paths.add(path); + checkDataTypeConsistency( + columnDataType, measurementInfoMap.get(measurementInfo.getMeasurement())); + + if (!measurementInfoMap.containsKey(measurementInfo.getMeasurement())) { + measurementInfo.setMeasurementAlias( + resultColumn.hasAlias() ? resultColumn.getAlias() : null); + measurementInfo.setColumnDataType(columnDataType); + measurementInfoMap.put(measurementInfo.getMeasurement(), measurementInfo); } + measurementSetOfGivenSuffix.add(measurementInfo.getMeasurement()); + paths.add(path); } } catch (MetadataException | QueryProcessException e) { throw new QueryProcessException(e.getMessage()); @@ -313,22 +302,33 @@ public class QueryOperator extends Operator { measurements.addAll(measurementSetOfGivenSuffix); } - List<String> trimMeasurements = convertSpecialClauseValues(alignByDevicePlan, measurements); // assigns to alignByDevicePlan - alignByDevicePlan.setMeasurements(trimMeasurements); - alignByDevicePlan.setMeasurementInfoMap(measurementInfoMap); - alignByDevicePlan.setDevices(devices); + alignByDevicePlan.setMeasurements(measurements); alignByDevicePlan.setPaths(paths); + alignByDevicePlan.setAggregations(aggregations); + alignByDevicePlan.setMeasurementInfoMap(measurementInfoMap); alignByDevicePlan.setEnableTracing(enableTracing); + alignByDevicePlan.deduplicate(generator); + if (whereComponent != null) { alignByDevicePlan.setDeviceToFilterMap( - concatFilterByDevice(devices, whereComponent.getFilterOperator())); + concatFilterByDevice(alignByDevicePlan, devices, whereComponent.getFilterOperator())); } return alignByDevicePlan; } + private void checkDataTypeConsistency(TSDataType checkedDataType, MeasurementInfo measurementInfo) + throws QueryProcessException { + // check datatype consistency + // an inconsistent example: select s0 from root.sg1.d1, root.sg1.d2 align by device + // while root.sg1.d1.s0 is INT32 and root.sg1.d2.s0 is FLOAT. + if (measurementInfo != null && !checkedDataType.equals(measurementInfo.getColumnDataType())) { + throw new QueryProcessException(AlignByDevicePlan.DATATYPE_ERROR_MESSAGE); + } + } + private void convertSpecialClauseValues(QueryPlan queryPlan) { if (specialClauseComponent != null) { queryPlan.setWithoutAllNull(specialClauseComponent.isWithoutAllNull()); @@ -340,16 +340,16 @@ public class QueryOperator extends Operator { } } - private List<String> convertSpecialClauseValues(QueryPlan queryPlan, List<String> measurements) - throws QueryProcessException { + private List<ResultColumn> convertSpecialClauseValues( + QueryPlan queryPlan, List<ResultColumn> resultColumns) throws QueryProcessException { convertSpecialClauseValues(queryPlan); // sLimit trim on the measurementColumnList if (specialClauseComponent.hasSlimit()) { int seriesSLimit = specialClauseComponent.getSeriesLimit(); int seriesOffset = specialClauseComponent.getSeriesOffset(); - return slimitTrimColumn(measurements, seriesSLimit, seriesOffset); + return slimitTrimColumn(resultColumns, seriesSLimit, seriesOffset); } - return measurements; + return resultColumns; } private List<PartialPath> removeStarsInDeviceWithUnique(List<PartialPath> paths) @@ -390,9 +390,10 @@ public class QueryOperator extends Operator { return initialMeasurement; } - private List<String> slimitTrimColumn(List<String> columnList, int seriesLimit, int seriesOffset) + private List<ResultColumn> slimitTrimColumn( + List<ResultColumn> resultColumns, int seriesLimit, int seriesOffset) throws QueryProcessException { - int size = columnList.size(); + int size = resultColumns.size(); // check parameter range if (seriesOffset >= size) { @@ -406,14 +407,15 @@ public class QueryOperator extends Operator { } // trim seriesPath list - return new ArrayList<>(columnList.subList(seriesOffset, endPosition)); + return new ArrayList<>(resultColumns.subList(seriesOffset, endPosition)); } // e.g. translate "select * from root.ln.d1, root.ln.d2 where s1 < 20 AND s2 > 10" to // [root.ln.d1 -> root.ln.d1.s1 < 20 AND root.ln.d1.s2 > 10, // root.ln.d2 -> root.ln.d2.s1 < 20 AND root.ln.d2.s2 > 10)] private Map<String, IExpression> concatFilterByDevice( - List<PartialPath> devices, FilterOperator operator) throws QueryProcessException { + AlignByDevicePlan alignByDevicePlan, List<PartialPath> devices, FilterOperator operator) + throws QueryProcessException { Map<String, IExpression> deviceToFilterMap = new HashMap<>(); Set<PartialPath> filterPaths = new HashSet<>(); Iterator<PartialPath> deviceIterator = devices.iterator(); @@ -425,6 +427,7 @@ public class QueryOperator extends Operator { concatFilterPath(device, newOperator, filterPaths); } catch (LogicalOptimizeException | MetadataException e) { deviceIterator.remove(); + alignByDevicePlan.removeDevice(device.getFullPath()); continue; } // transform to a list so it can be indexed diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java index a4079a1..c1c0897 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java @@ -22,11 +22,14 @@ import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.db.qp.logical.Operator; import org.apache.iotdb.db.qp.logical.Operator.OperatorType; import org.apache.iotdb.db.qp.strategy.PhysicalGenerator; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.expression.IExpression; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; public class AlignByDevicePlan extends QueryPlan { @@ -34,14 +37,17 @@ public class AlignByDevicePlan extends QueryPlan { "The paths of the SELECT clause can only be measurements or STAR."; public static final String ALIAS_ERROR_MESSAGE = "alias %s can only be matched with one time series"; + public static final String DATATYPE_ERROR_MESSAGE = + "The data types of the same measurement column should be the same across devices."; // to record result measurement columns, e.g. temperature, status, speed private List<String> measurements; - private List<TSDataType> dataTypes; private Map<String, MeasurementInfo> measurementInfoMap; + private List<PartialPath> deduplicatePaths; + private List<String> aggregations; - // to check data type consistency for the same name sensor of different devices - private List<PartialPath> devices; + // paths index of each device that need to execute + private Map<String, List<Integer>> deviceToPathIndex = new LinkedHashMap<>(); private Map<String, IExpression> deviceToFilterMap; private GroupByTimePlan groupByTimePlan; @@ -54,7 +60,40 @@ public class AlignByDevicePlan extends QueryPlan { @Override public void deduplicate(PhysicalGenerator physicalGenerator) { - // do nothing + Set<PartialPath> deduplicatePaths = new LinkedHashSet<>(); + List<String> deduplicatedAggregations = new ArrayList<>(); + for (int i = 0; i < paths.size(); i++) { + PartialPath path = paths.get(i); + if (!deduplicatePaths.contains(path)) { + deduplicatePaths.add(path); + if (this.aggregations != null) { + deduplicatedAggregations.add(this.aggregations.get(i)); + } + deviceToPathIndex + .computeIfAbsent(path.getDevice(), k -> new ArrayList<>()) + .add(deduplicatePaths.size() - 1); + } + } + // paths are deduplicated from here + this.deduplicatePaths = new ArrayList<>(deduplicatePaths); + setAggregations(deduplicatedAggregations); + this.paths = null; + } + + public List<PartialPath> getDeduplicatePaths() { + return deduplicatePaths; + } + + public void removeDevice(String device) { + deviceToPathIndex.remove(device); + } + + public void setMeasurementInfoMap(Map<String, MeasurementInfo> measurementInfoMap) { + this.measurementInfoMap = measurementInfoMap; + } + + public Map<String, MeasurementInfo> getMeasurementInfoMap() { + return measurementInfoMap; } public void setMeasurements(List<String> measurements) { @@ -65,21 +104,20 @@ public class AlignByDevicePlan extends QueryPlan { return measurements; } - @Override - public List<TSDataType> getDataTypes() { - return dataTypes; + public List<String> getAggregations() { + return aggregations; } - public void setDataTypes(List<TSDataType> dataTypes) { - this.dataTypes = dataTypes; + public void setAggregations(List<String> aggregations) { + this.aggregations = aggregations.isEmpty() ? null : aggregations; } - public void setDevices(List<PartialPath> devices) { - this.devices = devices; + public Map<String, List<Integer>> getDeviceToPathIndex() { + return deviceToPathIndex; } - public List<PartialPath> getDevices() { - return devices; + public void setDeviceToPathIndex(Map<String, List<Integer>> deviceToPathIndex) { + this.deviceToPathIndex = deviceToPathIndex; } public Map<String, IExpression> getDeviceToFilterMap() { @@ -116,24 +154,4 @@ public class AlignByDevicePlan extends QueryPlan { this.aggregationPlan = aggregationPlan; this.setOperatorType(Operator.OperatorType.AGGREGATION); } - - public void setMeasurementInfoMap(Map<String, MeasurementInfo> measurementInfoMap) { - this.measurementInfoMap = measurementInfoMap; - } - - public Map<String, MeasurementInfo> getMeasurementInfoMap() { - return measurementInfoMap; - } - - /** - * Exist: the measurements which don't belong to NonExist and Constant. NonExist: the measurements - * that do not exist in any device, data type is considered as String. The value is considered as - * null. Constant: the measurements that have quotation mark. e.g. "abc",'11'. The data type is - * considered as String and the value is the measurement name. - */ - public enum MeasurementType { - Exist, - NonExist, - Constant - } } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/MeasurementInfo.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/MeasurementInfo.java index 7c6dd17..c91493c 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/MeasurementInfo.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/MeasurementInfo.java @@ -18,51 +18,38 @@ */ package org.apache.iotdb.db.qp.physical.crud; -import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; public class MeasurementInfo { public MeasurementInfo() {} - public MeasurementInfo(MeasurementType measurementType) { - this.measurementType = measurementType; + public MeasurementInfo(String measurement) { + this.measurement = measurement; } + private String measurement; + // select s1, s2 as speed from root, then s2 -> speed private String measurementAlias; - // to record different kinds of measurement - private MeasurementType measurementType; - - // to record the real type of the measurement, used for actual query - private TSDataType measurementDataType; - // to record the datatype of the column in the result set private TSDataType columnDataType; - public void setMeasurementAlias(String measurementAlias) { - this.measurementAlias = measurementAlias; + public void setMeasurement(String measurement) { + this.measurement = measurement; } - public String getMeasurementAlias() { - return measurementAlias; + public String getMeasurement() { + return measurement; } - public void setMeasurementType(MeasurementType measurementType) { - this.measurementType = measurementType; - } - - public MeasurementType getMeasurementType() { - return measurementType; - } - - public void setMeasurementDataType(TSDataType measurementDataType) { - this.measurementDataType = measurementDataType; + public void setMeasurementAlias(String measurementAlias) { + this.measurementAlias = measurementAlias; } - public TSDataType getMeasurementDataType() { - return measurementDataType; + public String getMeasurementAlias() { + return measurementAlias; } public void setColumnDataType(TSDataType columnDataType) { diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java index e947144..83da2c0 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java @@ -74,14 +74,11 @@ public abstract class QueryPlan extends PhysicalPlan { @Override public void setPaths(List<PartialPath> paths) { - if (paths == null) this.paths = null; // align by device - else { - List<MeasurementPath> measurementPaths = new ArrayList<>(); - for (PartialPath path : paths) { - measurementPaths.add((MeasurementPath) path); - } - this.paths = measurementPaths; + List<MeasurementPath> measurementPaths = new ArrayList<>(); + for (PartialPath path : paths) { + measurementPaths.add((MeasurementPath) path); } + this.paths = measurementPaths; } public List<TSDataType> getDataTypes() { diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java index 620bfae..ce957cb 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java @@ -19,20 +19,15 @@ package org.apache.iotdb.db.query.dataset; import org.apache.iotdb.db.exception.StorageEngineException; -import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.query.QueryProcessException; -import org.apache.iotdb.db.metadata.path.MeasurementPath; import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.db.qp.physical.crud.AggregationPlan; import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan; -import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType; import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan; import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan; -import org.apache.iotdb.db.qp.physical.crud.MeasurementInfo; import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.executor.IQueryRouter; -import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.rpc.RedirectException; import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -48,8 +43,6 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; /** This QueryDataSet is used for ALIGN_BY_DEVICE query result. */ public class AlignByDeviceDataSet extends QueryDataSet { @@ -60,9 +53,10 @@ public class AlignByDeviceDataSet extends QueryDataSet { private IExpression expression; private List<String> measurements; - private List<PartialPath> devices; + private List<PartialPath> paths; + private List<String> aggregations; + private Map<String, List<Integer>> deviceToPathIndex; private Map<String, IExpression> deviceToFilterMap; - private Map<String, MeasurementInfo> measurementInfoMap; private GroupByTimePlan groupByTimePlan; private FillQueryPlan fillQueryPlan; @@ -70,24 +64,26 @@ public class AlignByDeviceDataSet extends QueryDataSet { private RawDataQueryPlan rawDataQueryPlan; private boolean curDataSetInitialized; - private PartialPath currentDevice; private QueryDataSet currentDataSet; - private Iterator<PartialPath> deviceIterator; + private Iterator<String> deviceIterator; + private String currentDevice; private List<String> executeColumns; private int pathsNum = 0; public AlignByDeviceDataSet( AlignByDevicePlan alignByDevicePlan, QueryContext context, IQueryRouter queryRouter) { - super(null, alignByDevicePlan.getDataTypes()); + super(null, null); // align by device's column number is different from other datasets // TODO I don't know whether it's right or not in AlignedPath, remember to check here while // adapting AlignByDevice query for new vector - super.columnNum = alignByDevicePlan.getDataTypes().size(); + super.columnNum = alignByDevicePlan.getMeasurements().size() + 1; // + 1 for 'device' this.measurements = alignByDevicePlan.getMeasurements(); - this.devices = alignByDevicePlan.getDevices(); - this.measurementInfoMap = alignByDevicePlan.getMeasurementInfoMap(); + this.paths = alignByDevicePlan.getDeduplicatePaths(); + this.aggregations = alignByDevicePlan.getAggregations(); this.queryRouter = queryRouter; this.context = context; + this.deviceIterator = alignByDevicePlan.getDeviceToPathIndex().keySet().iterator(); + this.deviceToPathIndex = alignByDevicePlan.getDeviceToPathIndex(); this.deviceToFilterMap = alignByDevicePlan.getDeviceToFilterMap(); switch (alignByDevicePlan.getOperatorType()) { @@ -115,7 +111,6 @@ public class AlignByDeviceDataSet extends QueryDataSet { } this.curDataSetInitialized = false; - this.deviceIterator = devices.iterator(); } public int getPathsNum() { @@ -133,37 +128,22 @@ public class AlignByDeviceDataSet extends QueryDataSet { while (deviceIterator.hasNext()) { currentDevice = deviceIterator.next(); - // get all measurements of current device - Map<String, MeasurementPath> measurementToPathMap = - getMeasurementsUnderGivenDevice(currentDevice); - Set<String> measurementOfGivenDevice = measurementToPathMap.keySet(); - - // extract paths and aggregations queried from all measurements - // executeColumns is for calculating rowRecord executeColumns = new ArrayList<>(); List<PartialPath> executePaths = new ArrayList<>(); List<String> executeAggregations = new ArrayList<>(); - for (Entry<String, MeasurementInfo> entry : measurementInfoMap.entrySet()) { - if (entry.getValue().getMeasurementType() != MeasurementType.Exist) { - continue; - } - String column = entry.getKey(); - String measurement = column; - if (dataSetType == DataSetType.GROUPBYTIME || dataSetType == DataSetType.AGGREGATE) { - measurement = column.substring(column.indexOf('(') + 1, column.indexOf(')')); - if (measurementOfGivenDevice.contains(measurement)) { - executeAggregations.add(column.substring(0, column.indexOf('('))); - } - } - if (measurementOfGivenDevice.contains(measurement)) { - executeColumns.add(column); - executePaths.add(measurementToPathMap.get(measurement)); + for (int i : deviceToPathIndex.get(currentDevice)) { + executePaths.add(paths.get(i)); + String executeColumn = paths.get(i).getMeasurement(); + if (aggregations != null) { + executeAggregations.add(aggregations.get(i)); + executeColumn = String.format("%s(%s)", aggregations.get(i), executeColumn); } + executeColumns.add(executeColumn); } // get filter to execute for the current device if (deviceToFilterMap != null) { - this.expression = deviceToFilterMap.get(currentDevice.getFullPath()); + this.expression = deviceToFilterMap.get(currentDevice); } // for tracing: try to calculate the number of series paths @@ -219,23 +199,6 @@ public class AlignByDeviceDataSet extends QueryDataSet { return false; } - /** Get all measurements under given device. */ - protected Map<String, MeasurementPath> getMeasurementsUnderGivenDevice(PartialPath device) - throws IOException { - try { - // TODO: Implement this method in Cluster MManager - Map<String, MeasurementPath> measurementToPathMap = new HashMap<>(); - List<MeasurementPath> measurementPaths = - IoTDB.metaManager.getAllMeasurementByDevicePath(device); - for (MeasurementPath measurementPath : measurementPaths) { - measurementToPathMap.put(measurementPath.getMeasurement(), measurementPath); - } - return measurementToPathMap; - } catch (MetadataException e) { - throw new IOException("Cannot get node from " + device, e); - } - } - @Override public RowRecord nextWithoutConstraint() throws IOException { RowRecord originRowRecord = currentDataSet.next(); @@ -243,7 +206,7 @@ public class AlignByDeviceDataSet extends QueryDataSet { RowRecord rowRecord = new RowRecord(originRowRecord.getTimestamp()); Field deviceField = new Field(TSDataType.TEXT); - deviceField.setBinaryV(new Binary(currentDevice.getFullPath())); + deviceField.setBinaryV(new Binary(currentDevice)); rowRecord.addField(deviceField); // device field should not be considered as a value field it should affect the WITHOUT NULL // judgement @@ -256,23 +219,10 @@ public class AlignByDeviceDataSet extends QueryDataSet { } for (String measurement : measurements) { - MeasurementInfo measurementInfo = measurementInfoMap.get(measurement); - switch (measurementInfo.getMeasurementType()) { - case Exist: - if (currentColumnMap.get(measurement) != null) { - rowRecord.addField(currentColumnMap.get(measurement)); - } else { - rowRecord.addField(new Field(null)); - } - break; - case NonExist: - rowRecord.addField(new Field(null)); - break; - case Constant: - Field res = new Field(TSDataType.TEXT); - res.setBinaryV(Binary.valueOf(measurement)); - rowRecord.addField(res); - break; + if (currentColumnMap.get(measurement) != null) { + rowRecord.addField(currentColumnMap.get(measurement)); + } else { + rowRecord.addField(new Field(null)); } } diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index 505c5a9..53fcb0f 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@ -883,42 +883,27 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If // get column types and do deduplication columnTypes.add(TSDataType.TEXT.toString()); // the DEVICE column of ALIGN_BY_DEVICE result - List<TSDataType> deduplicatedColumnsType = new ArrayList<>(); - deduplicatedColumnsType.add(TSDataType.TEXT); // the DEVICE column of ALIGN_BY_DEVICE result Set<String> deduplicatedMeasurements = new LinkedHashSet<>(); - Map<String, MeasurementInfo> measurementInfoMap = plan.getMeasurementInfoMap(); - // build column header with constant and non exist column and deduplication List<String> measurements = plan.getMeasurements(); for (String measurement : measurements) { - MeasurementInfo measurementInfo = measurementInfoMap.get(measurement); + MeasurementInfo measurementInfo = plan.getMeasurementInfoMap().get(measurement); TSDataType type = TSDataType.TEXT; - switch (measurementInfo.getMeasurementType()) { - case Exist: - type = measurementInfo.getColumnDataType(); - break; - case NonExist: - case Constant: - type = TSDataType.TEXT; + String measurementName = measurement; + if (measurementInfo != null) { + type = measurementInfo.getColumnDataType(); + measurementName = measurementInfo.getMeasurementAlias(); } - String measurementAlias = measurementInfo.getMeasurementAlias(); - respColumns.add(measurementAlias != null ? measurementAlias : measurement); + respColumns.add(measurementName != null ? measurementName : measurement); columnTypes.add(type.toString()); - if (!deduplicatedMeasurements.contains(measurement)) { - deduplicatedMeasurements.add(measurement); - deduplicatedColumnsType.add(type); - } + deduplicatedMeasurements.add(measurement); } // save deduplicated measurementColumn names and types in QueryPlan for the next stage to use. // i.e., used by AlignByDeviceDataSet constructor in `fetchResults` stage. plan.setMeasurements(new ArrayList<>(deduplicatedMeasurements)); - plan.setDataTypes(deduplicatedColumnsType); - - // set these null since they are never used henceforth in ALIGN_BY_DEVICE query processing. - plan.setPaths(null); } private TSExecuteStatementResp executeSelectIntoStatement( diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java index 7467407..4e99c9f 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java @@ -330,11 +330,8 @@ public class IoTDBAlignByDeviceIT { "103,root.vehicle.d0,199,null,", "104,root.vehicle.d0,190,null,", "105,root.vehicle.d0,199,11.11,", - "106,root.vehicle.d0,null,null,", "1000,root.vehicle.d0,55555,1000.11,", "946684800000,root.vehicle.d0,100,null,", - "1,root.vehicle.d1,null,null,", - "1000,root.vehicle.d1,null,null,", }; Class.forName(Config.JDBC_DRIVER_NAME); @@ -371,7 +368,7 @@ public class IoTDBAlignByDeviceIT { Assert.assertEquals(expectedBuilder.toString(), actualBuilder.toString()); cnt++; } - Assert.assertEquals(16, cnt); + Assert.assertEquals(13, cnt); } } catch (Exception e) { e.printStackTrace();
