This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch jira-768 in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 0428bd10df8101d3e2b545fc69938fc60afbc5dd Author: HTHou <[email protected]> AuthorDate: Tue Jul 14 17:39:03 2020 +0800 support max min value and max min time --- .../query/dataset/groupby/GroupByTimeDataSet.java | 2 +- .../db/query/executor/AggregationExecutor.java | 55 +++++++-- .../org/apache/iotdb/db/service/TSServiceImpl.java | 11 +- .../org/apache/iotdb/db/utils/FilePathUtils.java | 125 ++++++++++++++++----- 4 files changed, 150 insertions(+), 43 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java index e2005ce..caf50da 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java @@ -58,7 +58,7 @@ public class GroupByTimeDataSet extends QueryDataSet { } Map<Integer, String> pathIndex = new HashMap<>(); - Map<String, Long> finalPaths = FilePathUtils.getPathByLevel(plan, pathIndex); + Set<String> finalPaths = FilePathUtils.getPathByLevel(plan, pathIndex); // get all records from GroupByDataSet, then we merge every record if (logger.isDebugEnabled()) { diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java index 5895280..922621a 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java @@ -318,42 +318,79 @@ public class AggregationExecutor { * using aggregate result data list construct QueryDataSet. * * @param aggregateResultList aggregate result list + * @throws QueryProcessException */ - private QueryDataSet constructDataSet(List<AggregateResult> aggregateResultList, AggregationPlan plan) { + private QueryDataSet constructDataSet(List<AggregateResult> aggregateResultList, AggregationPlan plan) + throws QueryProcessException { RowRecord record = new RowRecord(0); for (AggregateResult resultData : aggregateResultList) { TSDataType dataType = resultData.getResultDataType(); record.addField(resultData.getResult(), dataType); } - - String aggregation = plan.getAggregations().get(0); SingleDataSet dataSet = null; if (plan.getLevel() >= 0) { - // current only support count operation + if (plan.getAggregations().size() > 1) { + //throw new QueryProcessException("Group by level doesn't support multiple aggregations"); + } + // TODO: Check data type here + + String aggregation = plan.getAggregations().get(0); + Map<Integer, String> pathIndex = new HashMap<>(); List<Path> paths = new ArrayList<>(); List<TSDataType> dataTypes = new ArrayList<>(); RowRecord curRecord = null; + Set<String> finalPaths = FilePathUtils.getPathByLevel(plan, pathIndex); switch (aggregation) { case "count": - Map<String, Long> finalPaths = FilePathUtils.getPathByLevel(plan, pathIndex); curRecord = FilePathUtils.mergeRecordByPath(record, finalPaths, pathIndex, TSDataType.INT64); for (int i = 0; i < finalPaths.size(); i++) { dataTypes.add(TSDataType.INT64); } break; case "sum": - Map<String, Long> finalPathsSum = FilePathUtils.getPathByLevel(plan, pathIndex); - curRecord = FilePathUtils.mergeRecordByPath(record, finalPathsSum, pathIndex, TSDataType.DOUBLE); - for (int i = 0; i < finalPathsSum.size(); i++) { + // Check datatype + curRecord = FilePathUtils.mergeRecordByPath(record, finalPaths, pathIndex, TSDataType.DOUBLE); + for (int i = 0; i < finalPaths.size(); i++) { dataTypes.add(TSDataType.DOUBLE); } break; case "avg": + // Check datatype Map<String, Float> finalPathsAvg = FilePathUtils.getPathByLevelAvg(plan, pathIndex); curRecord = FilePathUtils.avgRecordByPath(record, finalPathsAvg, pathIndex); for (int i = 0; i < finalPathsAvg.size(); i++) { - dataTypes.add(TSDataType.FLOAT); + dataTypes.add(TSDataType.DOUBLE); + } + break; + case "max_time": + curRecord = FilePathUtils.mergeMaxOrMinByPath(record, TSDataType.INT64, finalPaths, + pathIndex, true); + for (int i = 0; i < finalPaths.size(); i++) { + dataTypes.add(TSDataType.INT64); + } + break; + case "min_time": + curRecord = FilePathUtils.mergeMaxOrMinByPath(record, TSDataType.INT64, finalPaths, + pathIndex, false); + for (int i = 0; i < finalPaths.size(); i++) { + dataTypes.add(TSDataType.INT64); + } + break; + case "max_value": + // Check datatype + curRecord = FilePathUtils.mergeMaxOrMinByPath(record, plan.getDeduplicatedDataTypes().get(0), + finalPaths, pathIndex, true); + for (int i = 0; i < finalPaths.size(); i++) { + dataTypes.add(plan.getDeduplicatedDataTypes().get(0)); + } + break; + case "min_value": + // Check datatype + curRecord = FilePathUtils.mergeMaxOrMinByPath(record, plan.getDeduplicatedDataTypes().get(0), + finalPaths, pathIndex, false); + for (int i = 0; i < finalPaths.size(); i++) { + dataTypes.add(plan.getDeduplicatedDataTypes().get(0)); } break; default: diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index ad464d4..a50d42f 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@ -738,12 +738,11 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { // Last Query should return different respond instead of the static one // because the query dataset and query id is different although the header of last query is same. return StaticResps.LAST_RESP.deepCopy(); - } else if (plan instanceof AggregationPlan && ((AggregationPlan) plan).getLevel() >= 0) { - Map<String, Long> finalPaths = FilePathUtils - .getPathByLevel(((AggregationPlan) plan).getDeduplicatedPaths(), - ((AggregationPlan) plan).getLevel(), null); - for (Map.Entry<String, Long> entry : finalPaths.entrySet()) { - respColumns.add(((AggregationPlan) plan).getAggregations().get(0) + "(" + entry.getKey() + ")"); + } else if (plan instanceof AggregationPlan && ((AggregationPlan)plan).getLevel() >= 0) { + Set<String> finalPaths = FilePathUtils.getPathByLevel((AggregationPlan) plan, null); + TSDataType type = FilePathUtils.getTSDataType((AggregationPlan) plan); + for (String path : finalPaths) { + respColumns.add(((AggregationPlan) plan).getAggregations().get(0) + "(" + path + ")"); columnsTypes.add(type.toString()); } } else { diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java index 2057300..ecb41b0 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java @@ -19,10 +19,11 @@ package org.apache.iotdb.db.utils; import java.io.File; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; +import java.util.TreeSet; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.metadata.MetaUtils; @@ -65,9 +66,9 @@ public class FilePathUtils { * @param pathIndex * @return */ - public static Map<String, Long> getPathByLevel(AggregationPlan plan, Map<Integer, String> pathIndex) { + public static Set<String> getPathByLevel(AggregationPlan plan, Map<Integer, String> pathIndex) { // pathGroupByLevel -> count - Map<String, Long> finalPaths = new TreeMap<>(); + Set<String> finalPaths = new TreeSet<>(); List<Path> rawPaths = plan.getPaths(); int level = plan.getLevel(); @@ -89,7 +90,7 @@ public class FilePathUtils { } key = path.toString(); } - finalPaths.putIfAbsent(key, 0L); + finalPaths.add(key); if (pathIndex != null) { pathIndex.put(i++, key); } @@ -151,60 +152,121 @@ public class FilePathUtils { * @return */ public static RowRecord mergeRecordByPath(RowRecord newRecord, - Map<String, Long> finalPaths, - Map<Integer, String> pathIndex, - TSDataType type) { + Set<String> finalPaths, Map<Integer, String> pathIndex, + TSDataType type) { if (newRecord.getFields().size() < finalPaths.size()) { return null; } - Map<String, Object> finalPathMap = new HashMap<>(); + Map<String, Object> finalPathMap = new TreeMap<>(); // reset final paths - initFinalPathMap(finalPathMap, finalPaths, type); + initFinalPathMap(finalPathMap, finalPaths, type, 0); RowRecord tmpRecord = new RowRecord(newRecord.getTimestamp()); for (int i = 0; i < newRecord.getFields().size(); i++) { if (newRecord.getFields().get(i) != null) { - finalPathMap.put(pathIndex.get(i), getValue(type, newRecord.getFields().get(i), finalPathMap.get(pathIndex.get(i)))); + finalPathMap.put(pathIndex.get(i), getSum(type, newRecord.getFields().get(i), + finalPathMap.get(pathIndex.get(i)))); } } for (Map.Entry<String, Object> entry : finalPathMap.entrySet()) { tmpRecord.addField(Field.getField(entry.getValue(), type)); } - return tmpRecord; } - private static void initFinalPathMap(Map<String, Object> finalPathMap, Map<String, Long> finalPaths, TSDataType type) { + private static void initFinalPathMap(Map<String, Object> finalPathMap, + Set<String> finalPaths, TSDataType type, int initValue) { + for (String path : finalPaths) { + switch (type) { + case INT32 : + finalPathMap.put(path, initValue); + break; + case INT64 : + if (initValue == 0) { + finalPathMap.put(path, 0L); + } else if (initValue > 0) { + finalPathMap.put(path, Long.MAX_VALUE); + } else { + finalPathMap.put(path, Long.MIN_VALUE); + } + break; + case FLOAT : + if (initValue == 0) { + finalPathMap.put(path, 0F); + } else if (initValue > 0) { + finalPathMap.put(path, Float.MAX_VALUE); + } else { + finalPathMap.put(path, Float.MIN_VALUE); + } + break; + case DOUBLE : + if (initValue == 0) { + finalPathMap.put(path, 0D); + } else if (initValue > 0) { + finalPathMap.put(path, Double.MAX_VALUE); + } else { + finalPathMap.put(path, Double.MIN_VALUE); + } + break; + default : + finalPathMap.put(path, 0L); + } + } + } + + private static Object getSum(TSDataType type, Field field, Object before) { switch (type) { case INT64 : - for (Map.Entry<String, Long> entry : finalPaths.entrySet()) { - finalPathMap.put(entry.getKey(), 0L); - } - break; + return ((Long) before) + field.getLongV(); case DOUBLE : - for (Map.Entry<String, Long> entry : finalPaths.entrySet()) { - finalPathMap.put(entry.getKey(), 0D); - } - break; + return ((Double) before) + field.getDoubleV(); default : - for (Map.Entry<String, Long> entry : finalPaths.entrySet()) { - finalPathMap.put(entry.getKey(), 0L); - } + return ((Long) before) + field.getLongV(); } } - private static Object getValue(TSDataType type, Field field, Object before) { + private static Object getMaxOrMin(TSDataType type, Field field, Object before, boolean isMax) { switch (type) { case INT64 : - return ((Long) before) + field.getLongV(); + return isMax ? Math.max(((Long) before), field.getLongV()) + : Math.min(((Long) before), field.getLongV()); + case INT32 : + return isMax ? Math.max(((Integer) before), field.getIntV()) + : Math.min(((Integer) before), field.getIntV()); case DOUBLE : - return ((Double) before) + field.getDoubleV(); + return isMax ? Math.max(((Double) before), field.getDoubleV()) + : Math.min(((Double) before), field.getDoubleV()); + case FLOAT : + return isMax ? Math.max(((Float) before), field.getFloatV()) + : Math.min(((Float) before), field.getFloatV()); default : - return ((Long) before) + field.getLongV(); + return isMax ? Math.max(((Long) before), field.getLongV()) + : Math.min(((Long) before), field.getLongV()); + } + } + + public static RowRecord mergeMaxOrMinByPath(RowRecord newRecord, TSDataType type, + Set<String> finalPaths, Map<Integer, String> pathIndex, boolean isMax) { + if (newRecord.getFields().size() < finalPaths.size()) { + return null; + } + Map<String, Object> finalPathMap = new TreeMap<>(); + // reset final paths + initFinalPathMap(finalPathMap, finalPaths, type, isMax ? Integer.MIN_VALUE : Integer.MAX_VALUE); + for (int i = 0; i < newRecord.getFields().size(); i++) { + if (newRecord.getFields().get(i) != null) { + finalPathMap.put(pathIndex.get(i), getMaxOrMin(type, newRecord.getFields().get(i), + finalPathMap.get(pathIndex.get(i)), isMax)); + } } + RowRecord tmpRecord = new RowRecord(newRecord.getTimestamp()); + for (Map.Entry<String, Object> entry : finalPathMap.entrySet()) { + tmpRecord.addField(Field.getField(entry.getValue(), type)); + } + return tmpRecord; } public static RowRecord avgRecordByPath(RowRecord newRecord, Map<String, Float> finalPaths, @@ -239,8 +301,17 @@ public class FilePathUtils { switch (aggregation) { case "count" : return TSDataType.INT64; + case "avg" : case "sum" : return TSDataType.DOUBLE; + case "first_value" : + case "last_value" : + case "max_value" : + case "min_value" : + return plan.getDeduplicatedDataTypes().get(0); + case "max_time" : + case "min_time" : + return TSDataType.INT64; default : return TSDataType.INT64; }
