This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch bug/countBug in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit f92c3803fdeed2f77576ec85663b33218125a6c8 Author: JackieTien97 <[email protected]> AuthorDate: Tue Jun 9 14:03:03 2020 +0800 resolve conflicts --- .../apache/iotdb/db/qp/executor/PlanExecutor.java | 2 +- .../db/qp/physical/crud/AlignByDevicePlan.java | 12 +++ .../iotdb/db/qp/strategy/PhysicalGenerator.java | 23 +++-- .../db/query/dataset/AlignByDeviceDataSet.java | 11 +- .../IoTDBAggregationAlignByDeviceIT.java | 113 +++++++++++++++++++++ 5 files changed, 148 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java index 831d269..27c31e6 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java @@ -293,7 +293,7 @@ public class PlanExecutor implements IPlanExecutor { protected QueryDataSet processDataQuery(QueryPlan queryPlan, QueryContext context) throws StorageEngineException, QueryFilterOptimizationException, QueryProcessException, - IOException { + IOException, MetadataException { QueryDataSet queryDataSet; if (queryPlan instanceof AlignByDevicePlan) { queryDataSet = new AlignByDeviceDataSet((AlignByDevicePlan) queryPlan, context, queryRouter); diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java index 297c7b3..c822a53 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java @@ -35,6 +35,10 @@ public class AlignByDevicePlan extends QueryPlan { // to record different kinds of measurement private Map<String, MeasurementType> measurementTypeMap; + + // to record the real type of series + private Map<String, TSDataType> rawTypeMap; + private GroupByPlan groupByPlan; private FillQueryPlan fillQueryPlan; private AggregationPlan aggregationPlan; @@ -85,6 +89,14 @@ public class AlignByDevicePlan extends QueryPlan { this.measurementTypeMap = measurementTypeMap; } + public Map<String, TSDataType> getRawTypeMap() { + return rawTypeMap; + } + + public void setRawTypeMap(Map<String, TSDataType> rawTypeMap) { + this.rawTypeMap = rawTypeMap; + } + public GroupByPlan getGroupByPlan() { return groupByPlan; } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java index 8a1d8e3..4867753 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java @@ -276,9 +276,11 @@ public class PhysicalGenerator { } } - protected List<TSDataType> getSeriesTypes(List<String> paths, String aggregation) - throws MetadataException { - return SchemaUtils.getSeriesTypesByString(paths, aggregation); + + protected Pair<List<TSDataType>, List<TSDataType>> getSeriesTypes(List<String> paths, + String aggregation) throws MetadataException { + return new Pair<>(SchemaUtils.getSeriesTypesByString(paths, aggregation), + SchemaUtils.getSeriesTypesByString(paths, null)); } protected List<TSDataType> getSeriesTypes(List<Path> paths) throws MetadataException { @@ -322,6 +324,7 @@ public class PhysicalGenerator { } ((GroupByPlan) queryPlan) .setAggregations(queryOperator.getSelectOperator().getAggregations()); + } else if (queryOperator.isFill()) { queryPlan = new FillQueryPlan(); FilterOperator timeFilter = queryOperator.getFilterOperator(); @@ -368,6 +371,7 @@ public class PhysicalGenerator { List<String> measurements = new ArrayList<>(); // to check the same measurement of different devices having the same datatype Map<String, TSDataType> measurementDataTypeMap = new HashMap<>(); + Map<String, TSDataType> rawTypeMap = new HashMap<>(); Map<String, MeasurementType> measurementTypeMap = new HashMap<>(); List<Path> paths = new ArrayList<>(); @@ -401,7 +405,10 @@ public class PhysicalGenerator { String aggregation = originAggregations != null && !originAggregations.isEmpty() ? originAggregations.get(i) : null; - List<TSDataType> dataTypes = getSeriesTypes(actualPaths, aggregation); + Pair<List<TSDataType>, List<TSDataType>> pair = getSeriesTypes(actualPaths, + aggregation); + List<TSDataType> aggregationDataTypes = pair.left; + List<TSDataType> rawTypes = pair.right; for (int pathIdx = 0; pathIdx < actualPaths.size(); pathIdx++) { Path path = new Path(actualPaths.get(pathIdx)); @@ -414,16 +421,17 @@ public class PhysicalGenerator { } else { measurementChecked = path.getMeasurement(); } - TSDataType dataType = dataTypes.get(pathIdx); + TSDataType aggregationDataType = aggregationDataTypes.get(pathIdx); if (measurementDataTypeMap.containsKey(measurementChecked)) { - if (!dataType.equals(measurementDataTypeMap.get(measurementChecked))) { + if (!aggregationDataType.equals(measurementDataTypeMap.get(measurementChecked))) { throw new QueryProcessException( "The data types of the same measurement column should be the same across " + "devices in ALIGN_BY_DEVICE sql. For more details please refer to the " + "SQL document."); } } else { - measurementDataTypeMap.put(measurementChecked, dataType); + measurementDataTypeMap.put(measurementChecked, aggregationDataType); + rawTypeMap.put(measurementChecked, rawTypes.get(pathIdx)); } // update measurementSetOfGivenSuffix and Normal measurement @@ -465,6 +473,7 @@ public class PhysicalGenerator { alignByDevicePlan.setDevices(devices); alignByDevicePlan.setMeasurementDataTypeMap(measurementDataTypeMap); alignByDevicePlan.setMeasurementTypeMap(measurementTypeMap); + alignByDevicePlan.setRawTypeMap(rawTypeMap); alignByDevicePlan.setPaths(paths); // get deviceToFilterMap diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java index 5b4e9b0..be69936 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java @@ -62,7 +62,7 @@ public class AlignByDeviceDataSet extends QueryDataSet { private List<String> devices; private Map<String, IExpression> deviceToFilterMap; private Map<String, MeasurementType> measurementTypeMap; - private Map<String, TSDataType> measurementDataTpeMap; + private Map<String, TSDataType> measurementDataTypeMap; private GroupByPlan groupByPlan; private FillQueryPlan fillQueryPlan; @@ -76,12 +76,12 @@ public class AlignByDeviceDataSet extends QueryDataSet { private List<String> executeColumns; public AlignByDeviceDataSet(AlignByDevicePlan alignByDevicePlan, QueryContext context, - IQueryRouter queryRouter) { + IQueryRouter queryRouter) throws MetadataException { super(null, alignByDevicePlan.getDataTypes()); this.measurements = alignByDevicePlan.getMeasurements(); this.devices = alignByDevicePlan.getDevices(); - this.measurementDataTpeMap = alignByDevicePlan.getMeasurementDataTypeMap(); + this.measurementDataTypeMap = alignByDevicePlan.getMeasurementDataTypeMap(); this.queryRouter = queryRouter; this.context = context; this.deviceToFilterMap = alignByDevicePlan.getDeviceToFilterMap(); @@ -95,6 +95,7 @@ public class AlignByDeviceDataSet extends QueryDataSet { case AGGREGATION: this.dataSetType = DataSetType.AGGREGATE; this.aggregationPlan = alignByDevicePlan.getAggregationPlan(); + this.measurementDataTypeMap = alignByDevicePlan.getRawTypeMap(); break; case FILL: this.dataSetType = DataSetType.FILL; @@ -132,7 +133,7 @@ public class AlignByDeviceDataSet extends QueryDataSet { List<Path> executePaths = new ArrayList<>(); List<TSDataType> tsDataTypes = new ArrayList<>(); List<String> executeAggregations = new ArrayList<>(); - for (String column : measurementDataTpeMap.keySet()) { + for (String column : measurementDataTypeMap.keySet()) { String measurement = column; if (dataSetType == DataSetType.GROUPBY || dataSetType == DataSetType.AGGREGATE) { measurement = column.substring(column.indexOf('(') + 1, column.indexOf(')')); @@ -143,7 +144,7 @@ public class AlignByDeviceDataSet extends QueryDataSet { if (measurementOfGivenDevice.contains(measurement)) { executeColumns.add(column); executePaths.add(new Path(currentDevice, measurement)); - tsDataTypes.add(measurementDataTpeMap.get(column)); + tsDataTypes.add(measurementDataTypeMap.get(column)); } } diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationAlignByDeviceIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationAlignByDeviceIT.java new file mode 100644 index 0000000..3655fe8 --- /dev/null +++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationAlignByDeviceIT.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.integration; + +import static org.apache.iotdb.db.constant.TestConstant.count; +import static org.junit.Assert.fail; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.apache.iotdb.jdbc.Config; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class IoTDBAggregationAlignByDeviceIT { + + private static final String[] dataSet = new String[]{ + "INSERT INTO root.sg1.d1(timestamp,s1,s2,s3) values(1, 1.0, 1.1, 1.2)", + "INSERT INTO root.sg1.d1(timestamp,s1,s2,s3) values(2, 2.0, 2.1, 2.2)", + "INSERT INTO root.sg1.d1(timestamp,s1,s2,s3) values(3, 3.0, 3.1, 3.2)", + "flush", + "INSERT INTO root.sg1.d2(timestamp,s1,s2,s3) values(1, 1.0, 1.1, 1.2)", + "INSERT INTO root.sg1.d2(timestamp,s1,s2,s3) values(2, 2.0, 2.1, 2.2)", + "INSERT INTO root.sg1.d2(timestamp,s1,s2,s3) values(3, 3.0, 3.1, 3.2)", + "flush", + "INSERT INTO root.sg1.d1(timestamp,s1,s2,s3) values(1, 11.0, 11.1, 11.2)", + "INSERT INTO root.sg1.d1(timestamp,s1,s2,s3) values(2, 12.0, 12.1, 12.2)", + "INSERT INTO root.sg1.d1(timestamp,s1,s2,s3) values(3, 13.0, 13.1, 13.2)", + "INSERT INTO root.sg1.d2(timestamp,s1,s2,s3) values(1, 11.0, 11.1, 11.2)", + "INSERT INTO root.sg1.d2(timestamp,s1,s2,s3) values(2, 12.0, 12.1, 12.2)", + "INSERT INTO root.sg1.d2(timestamp,s1,s2,s3) values(3, 13.0, 13.1, 13.2)", + }; + + @Before + public void setUp() throws Exception { + EnvironmentUtils.closeStatMonitor(); + EnvironmentUtils.envSetUp(); + Class.forName(Config.JDBC_DRIVER_NAME); + prepareData(); + } + + @After + public void tearDown() throws Exception { + EnvironmentUtils.cleanEnv(); + } + + @Test + public void test() throws SQLException { + String[] retArray = new String[]{"root.sg1.d1,3,3,3", "root.sg1.d2,3,3,3",}; + try (Connection connection = DriverManager. + getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); + Statement statement = connection.createStatement()) { + + boolean hasResultSet = statement + .execute("SELECT count(*) from root align by device"); + + Assert.assertTrue(hasResultSet); + + int cnt; + try (ResultSet resultSet = statement.getResultSet()) { + cnt = 0; + while (resultSet.next()) { + String ans = + resultSet.getString("Device") + "," + resultSet.getString(count("s1")) + + "," + resultSet.getString(count("s2")) + "," + resultSet + .getString(count("s3")); + Assert.assertEquals(retArray[cnt], ans); + cnt++; + } + Assert.assertEquals(retArray.length, cnt); + } + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + private void prepareData() { + try (Connection connection = DriverManager + .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", + "root"); + Statement statement = connection.createStatement()) { + + for (String sql : dataSet) { + statement.execute(sql); + } + + } catch (Exception e) { + e.printStackTrace(); + } + } +}
