This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch jira-768 in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 00f33e3e6fd73400d1fef9983ad8546281a70c30 Author: HTHou <[email protected]> AuthorDate: Fri Jun 19 14:19:51 2020 +0800 sum --- .../query/dataset/groupby/GroupByTimeDataSet.java | 2 +- .../db/query/executor/AggregationExecutor.java | 22 ++++++---- .../org/apache/iotdb/db/service/TSServiceImpl.java | 2 +- .../org/apache/iotdb/db/utils/FilePathUtils.java | 51 +++++++++++++++++++--- 4 files changed, 62 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java index 224c04c..4fe932d 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java @@ -58,7 +58,7 @@ public class GroupByTimeDataSet extends QueryDataSet { } Map<Integer, String> pathIndex = new HashMap<>(); - Map<String, Float> finalPaths = FilePathUtils.getPathByLevel(plan.getPaths(), plan.getLevel(), pathIndex); + Map<String, Long> finalPaths = FilePathUtils.getPathByLevel(plan, pathIndex); // get all records from GroupByDataSet, then we merge every record if (logger.isDebugEnabled()) { diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java index 94d117a..8cb4ff5 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java @@ -48,7 +48,6 @@ import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator; -import org.apache.iotdb.tsfile.utils.Pair; import java.io.IOException; import java.util.*; @@ -275,7 +274,7 @@ public class AggregationExecutor { aggregateResults.add(result); } aggregateWithValueFilter(aggregateResults, timestampGenerator, readersOfSelectedSeries); - return constructDataSet(aggregateResults, queryPlan); + return constructDataSet(aggregateResults, (AggregationPlan) queryPlan); } protected TimeGenerator getTimeGenerator(QueryContext context, RawDataQueryPlan queryPlan) throws StorageEngineException { @@ -334,26 +333,33 @@ public class AggregationExecutor { Map<Integer, String> pathIndex = new HashMap<>(); List<Path> paths = new ArrayList<>(); List<TSDataType> dataTypes = new ArrayList<>(); - Map<String, Long> finalPaths = FilePathUtils.getPathByLevel(plan, pathIndex); RowRecord curRecord = null; switch (aggregation) { case "count": + Map<String, Long> finalPaths = FilePathUtils.getPathByLevel(plan, pathIndex); curRecord = FilePathUtils.mergeRecordByPath(record, finalPaths, pathIndex); for (int i = 0; i < finalPaths.size(); i++) { dataTypes.add(TSDataType.INT64); } break; + case "sum": + Map<String, Long> finalPathsSum = FilePathUtils.getPathByLevel(plan, pathIndex); + curRecord = FilePathUtils.mergeRecordByPath(record, finalPathsSum, pathIndex); + for (int i = 0; i < finalPathsSum.size(); i++) { + dataTypes.add(TSDataType.INT64); + } + break; case "avg": - curRecord = FilePathUtils.avgRecordByPath(record, finalPaths, pathIndex); - for (int i = 0; i < finalPaths.size(); i++) { + Map<String, Float> finalPathsAvg = FilePathUtils.getPathByLevelAvg(plan, pathIndex); + curRecord = FilePathUtils.avgRecordByPath(record, finalPathsAvg, pathIndex); + for (int i = 0; i < finalPathsAvg.size(); i++) { dataTypes.add(TSDataType.FLOAT); } break; + default: + break; } - - - dataSet = new SingleDataSet(paths, dataTypes); dataSet.setRecord(curRecord); } else { diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index 8c852fc..df378d6 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@ -693,7 +693,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { Map<String, Long> finalPaths = FilePathUtils.getPathByLevel((AggregationPlan) plan, null); for (Map.Entry<String, Long> entry : finalPaths.entrySet()) { respColumns.add(((AggregationPlan) plan).getAggregations().get(0) + "(" + entry.getKey() + ")"); - columnsTypes.add(entry.getValue().toString()); + columnsTypes.add(TSDataType.INT64.toString()); } } else { getWideQueryHeaders(plan, respColumns, columnsTypes); diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java index 9a7310a..4270df8 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java @@ -98,6 +98,47 @@ public class FilePathUtils { } /** + * get paths from group by level, like root.sg1.d2.s0, root.sg1.d1.s1 + * level=1, return [root.sg1, 0] and pathIndex turns to be [[0, root.sg1], [1, root.sg1]] + * @param rawPaths + * @param level + * @param pathIndex + * @return + */ + public static Map<String, Float> getPathByLevelAvg(AggregationPlan plan, Map<Integer, String> pathIndex) { + // pathGroupByLevel -> count + Map<String, Float> finalPaths = new TreeMap<>(); + + List<Path> rawPaths = plan.getPaths(); + int level = plan.getLevel(); + int i = 0; + for (Path value : rawPaths) { + String[] tmpPath = MetaUtils.getNodeNames(value.getFullPath()); + + String key; + if (tmpPath.length <= level) { + key = value.getFullPath(); + } else { + StringBuilder path = new StringBuilder(); + for (int k = 0; k <= level; k++) { + if (k == 0) { + path.append(tmpPath[k]); + } else { + path.append(".").append(tmpPath[k]); + } + } + key = path.toString(); + } + finalPaths.putIfAbsent(key, 0F); + if (pathIndex != null) { + pathIndex.put(i++, key); + } + } + + return finalPaths; + } + + /** * merge the raw record by level, for example * raw record [timestamp, root.sg1.d1.s0, root.sg1.d1.s1, root.sg1.d2.s2], level=1 * and newRecord data is [100, 1, 1, 1] @@ -136,15 +177,15 @@ public class FilePathUtils { return tmpRecord; } - public static RowRecord avgRecordByPath(RowRecord newRecord, Map<String, Long> finalPaths, + public static RowRecord avgRecordByPath(RowRecord newRecord, Map<String, Float> finalPaths, Map<Integer, String> pathIndex) { if (newRecord.getFields().size() < finalPaths.size()) { return null; } // reset final paths - for (Map.Entry<String, Long> entry : finalPaths.entrySet()) { - entry.setValue(0L); + for (Map.Entry<String, Float> entry : finalPaths.entrySet()) { + entry.setValue(0F); } RowRecord tmpRecord = new RowRecord(newRecord.getTimestamp()); @@ -156,8 +197,8 @@ public class FilePathUtils { } } - for (Map.Entry<String, Long> entry : finalPaths.entrySet()) { - tmpRecord.addField(Field.getField(entry.getValue(), TSDataType.INT64)); + for (Map.Entry<String, Float> entry : finalPaths.entrySet()) { + tmpRecord.addField(Field.getField(entry.getValue(), TSDataType.FLOAT)); } return tmpRecord;
