This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch jira-768
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit c8116115209a30cd800d1929e72d714968cda854
Author: HTHou <[email protected]>
AuthorDate: Wed Jun 17 09:15:40 2020 +0800

    aggregation group by level
---
 .../iotdb/db/qp/strategy/PhysicalGenerator.java    | 17 ------------
 .../query/dataset/groupby/GroupByTimeDataSet.java  |  8 +-----
 .../db/query/executor/AggregationExecutor.java     |  4 +--
 .../iotdb/db/query/executor/QueryRouter.java       |  1 -
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  8 +++---
 .../org/apache/iotdb/db/utils/FilePathUtils.java   | 30 +++++++++++++++-------
 6 files changed, 28 insertions(+), 40 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java 
b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index 00c8e7c..48abb5d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -362,15 +362,6 @@ public class PhysicalGenerator {
           
.setAggregations(queryOperator.getSelectOperator().getAggregations());
 
       ((GroupByTimePlan) queryPlan).setLevel(queryOperator.getLevel());
-
-      if (queryOperator.getLevel() >= 0) {
-        for (int i = 0; i < 
queryOperator.getSelectOperator().getAggregations().size(); i++) {
-          if (!SQLConstant.COUNT
-              
.equals(queryOperator.getSelectOperator().getAggregations().get(i))) {
-            throw new QueryProcessException("group by level only support count 
now.");
-          }
-        }
-      }
     } else if (queryOperator.isFill()) {
       queryPlan = new FillQueryPlan();
       FilterOperator timeFilter = queryOperator.getFilterOperator();
@@ -385,14 +376,6 @@ public class PhysicalGenerator {
       ((AggregationPlan) queryPlan).setLevel(queryOperator.getLevel());
       ((AggregationPlan) queryPlan)
           
.setAggregations(queryOperator.getSelectOperator().getAggregations());
-      if (queryOperator.getLevel() >= 0) {
-        for (int i = 0; i < 
queryOperator.getSelectOperator().getAggregations().size(); i++) {
-          if (!SQLConstant.COUNT
-              
.equals(queryOperator.getSelectOperator().getAggregations().get(i))) {
-            throw new QueryProcessException("group by level only support count 
now.");
-          }
-        }
-      }
     } else if (queryOperator.isLastQuery()) {
       queryPlan = new LastQueryPlan();
     } else {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
index b655709..224c04c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
@@ -22,16 +22,10 @@ package org.apache.iotdb.db.query.dataset.groupby;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
-import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.factory.AggregateResultFactory;
-import org.apache.iotdb.db.query.filter.TsFileFilter;
 import org.apache.iotdb.db.utils.FilePathUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.Field;
-import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,7 +58,7 @@ public class GroupByTimeDataSet extends QueryDataSet {
     }
 
     Map<Integer, String> pathIndex = new HashMap<>();
-    Map<String, Long> finalPaths = 
FilePathUtils.getPathByLevel(plan.getPaths(), plan.getLevel(), pathIndex);
+    Map<String, Float> finalPaths = 
FilePathUtils.getPathByLevel(plan.getPaths(), plan.getLevel(), pathIndex);
 
     // get all records from GroupByDataSet, then we merge every record
     if (logger.isDebugEnabled()) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index e7f51cd..aee2237 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -330,14 +330,14 @@ public class AggregationExecutor {
     if (((AggregationPlan)plan).getLevel() >= 0) {
       // current only support count operation
       Map<Integer, String> pathIndex = new HashMap<>();
-      Map<String, Long> finalPaths = 
FilePathUtils.getPathByLevel(plan.getDeduplicatedPaths(), 
((AggregationPlan)plan).getLevel(), pathIndex);
+      Map<String, TSDataType> finalPaths = 
FilePathUtils.getPathByLevel((AggregationPlan) plan, pathIndex);
 
       RowRecord curRecord = FilePathUtils.mergeRecordByPath(record, 
finalPaths, pathIndex);
 
       List<Path> paths = new ArrayList<>();
       List<TSDataType> dataTypes = new ArrayList<>();
       for (int i = 0; i < finalPaths.size(); i++) {
-        dataTypes.add(TSDataType.INT64);
+        dataTypes.add(TSDataType.DOUBLE);
       }
 
       dataSet = new SingleDataSet(paths, dataTypes);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java 
b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
index 4d2070f..6b421be 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.physical.crud.*;
 import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.dataset.SingleDataSet;
 import org.apache.iotdb.db.query.dataset.groupby.*;
 import org.apache.iotdb.db.query.executor.fill.IFill;
 import 
org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java 
b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 11e3ac3..2f424ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -690,10 +690,10 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
       // because the query dataset and query id is different although the 
header of last query is same.
       return StaticResps.LAST_RESP.deepCopy();
     } else if (plan instanceof AggregationPlan && 
((AggregationPlan)plan).getLevel() >= 0) {
-      Map<String, Long> finalPaths = 
FilePathUtils.getPathByLevel(((AggregationPlan)plan).getDeduplicatedPaths(), 
((AggregationPlan)plan).getLevel(), null);
-      for (Map.Entry<String, Long> entry : finalPaths.entrySet()) {
-        respColumns.add("count(" + entry.getKey() + ")");
-        columnsTypes.add(TSDataType.INT64.toString());
+      Map<String, TSDataType> finalPaths = 
FilePathUtils.getPathByLevel((AggregationPlan) plan, null);
+      for (Map.Entry<String, TSDataType> entry : finalPaths.entrySet()) {
+        respColumns.add(((AggregationPlan) plan).getAggregations().get(0) + 
"(" + entry.getKey() + ")");
+        columnsTypes.add(entry.getValue().toString());
       }
     } else {
       getWideQueryHeaders(plan, respColumns, columnsTypes);
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java 
b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
index bb4ff27..84b11e2 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
@@ -25,6 +25,7 @@ import java.util.TreeMap;
 
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.metadata.MetaUtils;
+import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Field;
 import org.apache.iotdb.tsfile.read.common.Path;
@@ -63,10 +64,13 @@ public class FilePathUtils {
    * @param pathIndex
    * @return
    */
-  public static Map<String, Long> getPathByLevel(List<Path> rawPaths, int 
level, Map<Integer, String> pathIndex) {
+  public static Map<String, TSDataType> getPathByLevel(AggregationPlan plan, 
Map<Integer, String> pathIndex) {
     // pathGroupByLevel -> count
-    Map<String, Long> finalPaths = new TreeMap<>();
+    Map<String, TSDataType> finalPaths = new TreeMap<>();
 
+    List<Path> rawPaths = plan.getPaths();
+    int level = plan.getLevel();
+    String aggregation = plan.getAggregations().get(0);
     int i = 0;
     for (Path value : rawPaths) {
       String[] tmpPath = MetaUtils.getNodeNames(value.getFullPath());
@@ -85,7 +89,15 @@ public class FilePathUtils {
         }
         key = path.toString();
       }
-      finalPaths.putIfAbsent(key, 0L);
+      switch (aggregation) {
+        case "sum" :
+          finalPaths.putIfAbsent(key, TSDataType.INT64);
+          break;
+        case "avg" :
+          finalPaths.putIfAbsent(key, TSDataType.INT64);
+          break;
+      }
+      finalPaths.putIfAbsent(key, (float) 0);
       if (pathIndex != null) {
         pathIndex.put(i++, key);
       }
@@ -106,15 +118,15 @@ public class FilePathUtils {
    * @return
    */
   public static RowRecord mergeRecordByPath(RowRecord newRecord,
-                                      Map<String, Long> finalPaths,
+                                      Map<String, Float> finalPaths,
                                       Map<Integer, String> pathIndex) {
     if (newRecord.getFields().size() < finalPaths.size()) {
       return null;
     }
 
     // reset final paths
-    for (Map.Entry<String, Long> entry : finalPaths.entrySet()) {
-      entry.setValue(0L);
+    for (Map.Entry<String, Float> entry : finalPaths.entrySet()) {
+      entry.setValue((float) 0);
     }
 
     RowRecord tmpRecord = new RowRecord(newRecord.getTimestamp());
@@ -122,12 +134,12 @@ public class FilePathUtils {
     for (int i = 0; i < newRecord.getFields().size(); i++) {
       if (newRecord.getFields().get(i) != null) {
         finalPaths.put(pathIndex.get(i),
-          finalPaths.get(pathIndex.get(i)) + 
newRecord.getFields().get(i).getLongV());
+          finalPaths.get(pathIndex.get(i)) + 
newRecord.getFields().get(i).getFloatV());
       }
     }
 
-    for (Map.Entry<String, Long> entry : finalPaths.entrySet()) {
-      tmpRecord.addField(Field.getField(entry.getValue(), TSDataType.INT64));
+    for (Map.Entry<String, Float> entry : finalPaths.entrySet()) {
+      tmpRecord.addField(Field.getField(entry.getValue(), TSDataType.FLOAT));
     }
 
     return tmpRecord;

Reply via email to