This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch jira-768 in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 8c3b9af22bc9ed68c12a77bb4757effece2ce456 Author: HTHou <[email protected]> AuthorDate: Mon Jun 22 15:36:38 2020 +0800 support sum --- .../query/dataset/groupby/GroupByTimeDataSet.java | 3 +- .../db/query/executor/AggregationExecutor.java | 6 +-- .../org/apache/iotdb/db/service/TSServiceImpl.java | 3 +- .../org/apache/iotdb/db/utils/FilePathUtils.java | 58 +++++++++++++++++++--- 4 files changed, 57 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java index 4fe932d..e2005ce 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java @@ -65,7 +65,8 @@ public class GroupByTimeDataSet extends QueryDataSet { logger.debug("only group by level, paths:" + groupByTimePlan.getPaths()); } while (dataSet != null && dataSet.hasNextWithoutConstraint()) { - RowRecord curRecord = FilePathUtils.mergeRecordByPath(dataSet.nextWithoutConstraint(), finalPaths, pathIndex); + RowRecord curRecord = FilePathUtils + .mergeRecordByPath(dataSet.nextWithoutConstraint(), finalPaths, pathIndex, dataTypes.get(0)); if (curRecord != null) { records.add(curRecord); } diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java index 8cb4ff5..5895280 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java @@ -337,16 +337,16 @@ public class AggregationExecutor { switch (aggregation) { case "count": Map<String, Long> finalPaths = FilePathUtils.getPathByLevel(plan, pathIndex); - curRecord = FilePathUtils.mergeRecordByPath(record, finalPaths, pathIndex); + curRecord = FilePathUtils.mergeRecordByPath(record, finalPaths, pathIndex, TSDataType.INT64); for (int i = 0; i < finalPaths.size(); i++) { dataTypes.add(TSDataType.INT64); } break; case "sum": Map<String, Long> finalPathsSum = FilePathUtils.getPathByLevel(plan, pathIndex); - curRecord = FilePathUtils.mergeRecordByPath(record, finalPathsSum, pathIndex); + curRecord = FilePathUtils.mergeRecordByPath(record, finalPathsSum, pathIndex, TSDataType.DOUBLE); for (int i = 0; i < finalPathsSum.size(); i++) { - dataTypes.add(TSDataType.INT64); + dataTypes.add(TSDataType.DOUBLE); } break; case "avg": diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index df378d6..5be346d 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@ -691,9 +691,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { return StaticResps.LAST_RESP.deepCopy(); } else if (plan instanceof AggregationPlan && ((AggregationPlan)plan).getLevel() >= 0) { Map<String, Long> finalPaths = FilePathUtils.getPathByLevel((AggregationPlan) plan, null); + TSDataType type = FilePathUtils.getTSDataType((AggregationPlan) plan); for (Map.Entry<String, Long> entry : finalPaths.entrySet()) { respColumns.add(((AggregationPlan) plan).getAggregations().get(0) + "(" + entry.getKey() + ")"); - columnsTypes.add(TSDataType.INT64.toString()); + columnsTypes.add(type.toString()); } } else { getWideQueryHeaders(plan, respColumns, columnsTypes); diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java index 4270df8..2057300 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.utils; import java.io.File; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -151,32 +152,61 @@ public class FilePathUtils { */ public static RowRecord mergeRecordByPath(RowRecord newRecord, Map<String, Long> finalPaths, - Map<Integer, String> pathIndex) { + Map<Integer, String> pathIndex, + TSDataType type) { if (newRecord.getFields().size() < finalPaths.size()) { return null; } + Map<String, Object> finalPathMap = new HashMap<>(); // reset final paths - for (Map.Entry<String, Long> entry : finalPaths.entrySet()) { - entry.setValue(0L); - } + initFinalPathMap(finalPathMap, finalPaths, type); RowRecord tmpRecord = new RowRecord(newRecord.getTimestamp()); for (int i = 0; i < newRecord.getFields().size(); i++) { if (newRecord.getFields().get(i) != null) { - finalPaths.put(pathIndex.get(i), - finalPaths.get(pathIndex.get(i)) + newRecord.getFields().get(i).getLongV()); + finalPathMap.put(pathIndex.get(i), getValue(type, newRecord.getFields().get(i), finalPathMap.get(pathIndex.get(i)))); } } - for (Map.Entry<String, Long> entry : finalPaths.entrySet()) { - tmpRecord.addField(Field.getField(entry.getValue(), TSDataType.INT64)); + for (Map.Entry<String, Object> entry : finalPathMap.entrySet()) { + tmpRecord.addField(Field.getField(entry.getValue(), type)); } return tmpRecord; } + private static void initFinalPathMap(Map<String, Object> finalPathMap, Map<String, Long> finalPaths, TSDataType type) { + switch (type) { + case INT64 : + for (Map.Entry<String, Long> entry : finalPaths.entrySet()) { + finalPathMap.put(entry.getKey(), 0L); + } + break; + case DOUBLE : + for (Map.Entry<String, Long> entry : finalPaths.entrySet()) { + finalPathMap.put(entry.getKey(), 0D); + } + break; + default : + for (Map.Entry<String, Long> entry : finalPaths.entrySet()) { + finalPathMap.put(entry.getKey(), 0L); + } + } + } + + private static Object getValue(TSDataType type, Field field, Object before) { + switch (type) { + case INT64 : + return ((Long) before) + field.getLongV(); + case DOUBLE : + return ((Double) before) + field.getDoubleV(); + default : + return ((Long) before) + field.getLongV(); + } + } + public static RowRecord avgRecordByPath(RowRecord newRecord, Map<String, Float> finalPaths, Map<Integer, String> pathIndex) { if (newRecord.getFields().size() < finalPaths.size()) { @@ -204,4 +234,16 @@ public class FilePathUtils { return tmpRecord; } + public static TSDataType getTSDataType(AggregationPlan plan) { + String aggregation = plan.getAggregations().get(0); + switch (aggregation) { + case "count" : + return TSDataType.INT64; + case "sum" : + return TSDataType.DOUBLE; + default : + return TSDataType.INT64; + } + } + }
