This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch jira-768 in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 991515bddf1c03272185bb6a590ceecf1a0dcca3 Author: HTHou <[email protected]> AuthorDate: Thu Jun 18 15:41:48 2020 +0800 jira 768 --- .../db/query/executor/AggregationExecutor.java | 30 +++++++++---- .../org/apache/iotdb/db/service/TSServiceImpl.java | 4 +- .../org/apache/iotdb/db/utils/FilePathUtils.java | 52 +++++++++++++++------- 3 files changed, 59 insertions(+), 27 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java index aee2237..94d117a 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java @@ -48,6 +48,7 @@ import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator; +import org.apache.iotdb.tsfile.utils.Pair; import java.io.IOException; import java.util.*; @@ -319,27 +320,40 @@ public class AggregationExecutor { * * @param aggregateResultList aggregate result list */ - private QueryDataSet constructDataSet(List<AggregateResult> aggregateResultList, RawDataQueryPlan plan) { + private QueryDataSet constructDataSet(List<AggregateResult> aggregateResultList, AggregationPlan plan) { RowRecord record = new RowRecord(0); for (AggregateResult resultData : aggregateResultList) { TSDataType dataType = resultData.getResultDataType(); record.addField(resultData.getResult(), dataType); } + String aggregation = plan.getAggregations().get(0); SingleDataSet dataSet = null; - if (((AggregationPlan)plan).getLevel() >= 0) { + if (plan.getLevel() >= 0) { // current only support count operation Map<Integer, String> pathIndex = new HashMap<>(); - Map<String, TSDataType> finalPaths = FilePathUtils.getPathByLevel((AggregationPlan) plan, pathIndex); - - RowRecord curRecord = FilePathUtils.mergeRecordByPath(record, finalPaths, pathIndex); - List<Path> paths = new ArrayList<>(); List<TSDataType> dataTypes = new ArrayList<>(); - for (int i = 0; i < finalPaths.size(); i++) { - dataTypes.add(TSDataType.DOUBLE); + Map<String, Long> finalPaths = FilePathUtils.getPathByLevel(plan, pathIndex); + RowRecord curRecord = null; + switch (aggregation) { + case "count": + curRecord = FilePathUtils.mergeRecordByPath(record, finalPaths, pathIndex); + for (int i = 0; i < finalPaths.size(); i++) { + dataTypes.add(TSDataType.INT64); + } + break; + case "avg": + curRecord = FilePathUtils.avgRecordByPath(record, finalPaths, pathIndex); + for (int i = 0; i < finalPaths.size(); i++) { + dataTypes.add(TSDataType.FLOAT); + } + break; } + + + dataSet = new SingleDataSet(paths, dataTypes); dataSet.setRecord(curRecord); } else { diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index 3b51b3c..8c852fc 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@ -690,8 +690,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { // because the query dataset and query id is different although the header of last query is same. return StaticResps.LAST_RESP.deepCopy(); } else if (plan instanceof AggregationPlan && ((AggregationPlan)plan).getLevel() >= 0) { - Map<String, TSDataType> finalPaths = FilePathUtils.getPathByLevel((AggregationPlan) plan, null); - for (Map.Entry<String, TSDataType> entry : finalPaths.entrySet()) { + Map<String, Long> finalPaths = FilePathUtils.getPathByLevel((AggregationPlan) plan, null); + for (Map.Entry<String, Long> entry : finalPaths.entrySet()) { respColumns.add(((AggregationPlan) plan).getAggregations().get(0) + "(" + entry.getKey() + ")"); columnsTypes.add(entry.getValue().toString()); } diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java index 84b11e2..9a7310a 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java @@ -64,13 +64,12 @@ public class FilePathUtils { * @param pathIndex * @return */ - public static Map<String, TSDataType> getPathByLevel(AggregationPlan plan, Map<Integer, String> pathIndex) { + public static Map<String, Long> getPathByLevel(AggregationPlan plan, Map<Integer, String> pathIndex) { // pathGroupByLevel -> count - Map<String, TSDataType> finalPaths = new TreeMap<>(); + Map<String, Long> finalPaths = new TreeMap<>(); List<Path> rawPaths = plan.getPaths(); int level = plan.getLevel(); - String aggregation = plan.getAggregations().get(0); int i = 0; for (Path value : rawPaths) { String[] tmpPath = MetaUtils.getNodeNames(value.getFullPath()); @@ -89,15 +88,7 @@ public class FilePathUtils { } key = path.toString(); } - switch (aggregation) { - case "sum" : - finalPaths.putIfAbsent(key, TSDataType.INT64); - break; - case "avg" : - finalPaths.putIfAbsent(key, TSDataType.INT64); - break; - } - finalPaths.putIfAbsent(key, (float) 0); + finalPaths.putIfAbsent(key, 0L); if (pathIndex != null) { pathIndex.put(i++, key); } @@ -118,15 +109,42 @@ public class FilePathUtils { * @return */ public static RowRecord mergeRecordByPath(RowRecord newRecord, - Map<String, Float> finalPaths, + Map<String, Long> finalPaths, Map<Integer, String> pathIndex) { if (newRecord.getFields().size() < finalPaths.size()) { return null; } // reset final paths - for (Map.Entry<String, Float> entry : finalPaths.entrySet()) { - entry.setValue((float) 0); + for (Map.Entry<String, Long> entry : finalPaths.entrySet()) { + entry.setValue(0L); + } + + RowRecord tmpRecord = new RowRecord(newRecord.getTimestamp()); + + for (int i = 0; i < newRecord.getFields().size(); i++) { + if (newRecord.getFields().get(i) != null) { + finalPaths.put(pathIndex.get(i), + finalPaths.get(pathIndex.get(i)) + newRecord.getFields().get(i).getLongV()); + } + } + + for (Map.Entry<String, Long> entry : finalPaths.entrySet()) { + tmpRecord.addField(Field.getField(entry.getValue(), TSDataType.INT64)); + } + + return tmpRecord; + } + + public static RowRecord avgRecordByPath(RowRecord newRecord, Map<String, Long> finalPaths, + Map<Integer, String> pathIndex) { + if (newRecord.getFields().size() < finalPaths.size()) { + return null; + } + + // reset final paths + for (Map.Entry<String, Long> entry : finalPaths.entrySet()) { + entry.setValue(0L); } RowRecord tmpRecord = new RowRecord(newRecord.getTimestamp()); @@ -138,8 +156,8 @@ public class FilePathUtils { } } - for (Map.Entry<String, Float> entry : finalPaths.entrySet()) { - tmpRecord.addField(Field.getField(entry.getValue(), TSDataType.FLOAT)); + for (Map.Entry<String, Long> entry : finalPaths.entrySet()) { + tmpRecord.addField(Field.getField(entry.getValue(), TSDataType.INT64)); } return tmpRecord;
