This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch jira-768 in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit c8116115209a30cd800d1929e72d714968cda854 Author: HTHou <[email protected]> AuthorDate: Wed Jun 17 09:15:40 2020 +0800 aggregation group by level --- .../iotdb/db/qp/strategy/PhysicalGenerator.java | 17 ------------ .../query/dataset/groupby/GroupByTimeDataSet.java | 8 +----- .../db/query/executor/AggregationExecutor.java | 4 +-- .../iotdb/db/query/executor/QueryRouter.java | 1 - .../org/apache/iotdb/db/service/TSServiceImpl.java | 8 +++--- .../org/apache/iotdb/db/utils/FilePathUtils.java | 30 +++++++++++++++------- 6 files changed, 28 insertions(+), 40 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java index 00c8e7c..48abb5d 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java @@ -362,15 +362,6 @@ public class PhysicalGenerator { .setAggregations(queryOperator.getSelectOperator().getAggregations()); ((GroupByTimePlan) queryPlan).setLevel(queryOperator.getLevel()); - - if (queryOperator.getLevel() >= 0) { - for (int i = 0; i < queryOperator.getSelectOperator().getAggregations().size(); i++) { - if (!SQLConstant.COUNT - .equals(queryOperator.getSelectOperator().getAggregations().get(i))) { - throw new QueryProcessException("group by level only support count now."); - } - } - } } else if (queryOperator.isFill()) { queryPlan = new FillQueryPlan(); FilterOperator timeFilter = queryOperator.getFilterOperator(); @@ -385,14 +376,6 @@ public class PhysicalGenerator { ((AggregationPlan) queryPlan).setLevel(queryOperator.getLevel()); ((AggregationPlan) queryPlan) .setAggregations(queryOperator.getSelectOperator().getAggregations()); - if (queryOperator.getLevel() >= 0) { - for (int i = 0; i < queryOperator.getSelectOperator().getAggregations().size(); i++) { - if (!SQLConstant.COUNT - .equals(queryOperator.getSelectOperator().getAggregations().get(i))) { - throw new QueryProcessException("group by level only support count now."); - } - } - } } else if (queryOperator.isLastQuery()) { queryPlan = new LastQueryPlan(); } else { diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java index b655709..224c04c 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java @@ -22,16 +22,10 @@ package org.apache.iotdb.db.query.dataset.groupby; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan; -import org.apache.iotdb.db.query.aggregation.AggregateResult; import org.apache.iotdb.db.query.context.QueryContext; -import org.apache.iotdb.db.query.factory.AggregateResultFactory; -import org.apache.iotdb.db.query.filter.TsFileFilter; import org.apache.iotdb.db.utils.FilePathUtils; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.read.common.Field; -import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.read.common.RowRecord; -import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,7 +58,7 @@ public class GroupByTimeDataSet extends QueryDataSet { } Map<Integer, String> pathIndex = new HashMap<>(); - Map<String, Long> finalPaths = FilePathUtils.getPathByLevel(plan.getPaths(), plan.getLevel(), pathIndex); + Map<String, Float> finalPaths = FilePathUtils.getPathByLevel(plan.getPaths(), plan.getLevel(), pathIndex); // get all records from GroupByDataSet, then we merge every record if (logger.isDebugEnabled()) { diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java index e7f51cd..aee2237 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java @@ -330,14 +330,14 @@ public class AggregationExecutor { if (((AggregationPlan)plan).getLevel() >= 0) { // current only support count operation Map<Integer, String> pathIndex = new HashMap<>(); - Map<String, Long> finalPaths = FilePathUtils.getPathByLevel(plan.getDeduplicatedPaths(), ((AggregationPlan)plan).getLevel(), pathIndex); + Map<String, TSDataType> finalPaths = FilePathUtils.getPathByLevel((AggregationPlan) plan, pathIndex); RowRecord curRecord = FilePathUtils.mergeRecordByPath(record, finalPaths, pathIndex); List<Path> paths = new ArrayList<>(); List<TSDataType> dataTypes = new ArrayList<>(); for (int i = 0; i < finalPaths.size(); i++) { - dataTypes.add(TSDataType.INT64); + dataTypes.add(TSDataType.DOUBLE); } dataSet = new SingleDataSet(paths, dataTypes); diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java index 4d2070f..6b421be 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java @@ -23,7 +23,6 @@ import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.qp.physical.crud.*; import org.apache.iotdb.db.query.context.QueryContext; -import org.apache.iotdb.db.query.dataset.SingleDataSet; import org.apache.iotdb.db.query.dataset.groupby.*; import org.apache.iotdb.db.query.executor.fill.IFill; import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException; diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index 11e3ac3..2f424ab 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@ -690,10 +690,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { // because the query dataset and query id is different although the header of last query is same. return StaticResps.LAST_RESP.deepCopy(); } else if (plan instanceof AggregationPlan && ((AggregationPlan)plan).getLevel() >= 0) { - Map<String, Long> finalPaths = FilePathUtils.getPathByLevel(((AggregationPlan)plan).getDeduplicatedPaths(), ((AggregationPlan)plan).getLevel(), null); - for (Map.Entry<String, Long> entry : finalPaths.entrySet()) { - respColumns.add("count(" + entry.getKey() + ")"); - columnsTypes.add(TSDataType.INT64.toString()); + Map<String, TSDataType> finalPaths = FilePathUtils.getPathByLevel((AggregationPlan) plan, null); + for (Map.Entry<String, TSDataType> entry : finalPaths.entrySet()) { + respColumns.add(((AggregationPlan) plan).getAggregations().get(0) + "(" + entry.getKey() + ")"); + columnsTypes.add(entry.getValue().toString()); } } else { getWideQueryHeaders(plan, respColumns, columnsTypes); diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java index bb4ff27..84b11e2 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java @@ -25,6 +25,7 @@ import java.util.TreeMap; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.metadata.MetaUtils; +import org.apache.iotdb.db.qp.physical.crud.AggregationPlan; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.Field; import org.apache.iotdb.tsfile.read.common.Path; @@ -63,10 +64,13 @@ public class FilePathUtils { * @param pathIndex * @return */ - public static Map<String, Long> getPathByLevel(List<Path> rawPaths, int level, Map<Integer, String> pathIndex) { + public static Map<String, TSDataType> getPathByLevel(AggregationPlan plan, Map<Integer, String> pathIndex) { // pathGroupByLevel -> count - Map<String, Long> finalPaths = new TreeMap<>(); + Map<String, TSDataType> finalPaths = new TreeMap<>(); + List<Path> rawPaths = plan.getPaths(); + int level = plan.getLevel(); + String aggregation = plan.getAggregations().get(0); int i = 0; for (Path value : rawPaths) { String[] tmpPath = MetaUtils.getNodeNames(value.getFullPath()); @@ -85,7 +89,15 @@ public class FilePathUtils { } key = path.toString(); } - finalPaths.putIfAbsent(key, 0L); + switch (aggregation) { + case "sum" : + finalPaths.putIfAbsent(key, TSDataType.INT64); + break; + case "avg" : + finalPaths.putIfAbsent(key, TSDataType.INT64); + break; + } + finalPaths.putIfAbsent(key, (float) 0); if (pathIndex != null) { pathIndex.put(i++, key); } @@ -106,15 +118,15 @@ public class FilePathUtils { * @return */ public static RowRecord mergeRecordByPath(RowRecord newRecord, - Map<String, Long> finalPaths, + Map<String, Float> finalPaths, Map<Integer, String> pathIndex) { if (newRecord.getFields().size() < finalPaths.size()) { return null; } // reset final paths - for (Map.Entry<String, Long> entry : finalPaths.entrySet()) { - entry.setValue(0L); + for (Map.Entry<String, Float> entry : finalPaths.entrySet()) { + entry.setValue((float) 0); } RowRecord tmpRecord = new RowRecord(newRecord.getTimestamp()); @@ -122,12 +134,12 @@ public class FilePathUtils { for (int i = 0; i < newRecord.getFields().size(); i++) { if (newRecord.getFields().get(i) != null) { finalPaths.put(pathIndex.get(i), - finalPaths.get(pathIndex.get(i)) + newRecord.getFields().get(i).getLongV()); + finalPaths.get(pathIndex.get(i)) + newRecord.getFields().get(i).getFloatV()); } } - for (Map.Entry<String, Long> entry : finalPaths.entrySet()) { - tmpRecord.addField(Field.getField(entry.getValue(), TSDataType.INT64)); + for (Map.Entry<String, Float> entry : finalPaths.entrySet()) { + tmpRecord.addField(Field.getField(entry.getValue(), TSDataType.FLOAT)); } return tmpRecord;
