This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch clusterVector
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c96ef56b8da18e3767d40265d9059982d6680868
Author: Alima777 <[email protected]>
AuthorDate: Mon Oct 18 10:55:58 2021 +0800

    Revert "Reimplement current function"
    
    This reverts commit 4ea643c884b2d8d4beeac3f375f45edaed3a67e1.
---
 .../iotdb/db/qp/physical/crud/AggregationPlan.java |  42 +++----
 .../iotdb/db/qp/physical/crud/QueryPlan.java       |   4 -
 .../dataset/groupby/GroupByEngineDataSet.java      |   6 -
 .../query/dataset/groupby/GroupByTimeDataSet.java  |  42 +++----
 .../groupby/GroupByWithValueFilterDataSet.java     |  26 +++--
 .../groupby/GroupByWithoutValueFilterDataSet.java  |  25 ++---
 .../db/query/executor/AggregationExecutor.java     |  31 ++++--
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  15 ++-
 .../org/apache/iotdb/db/utils/AggregateUtils.java  | 123 ++++++++++++++++++++-
 .../db/integration/IoTDBContinuousQueryIT.java     |   2 -
 .../aggregation/IoTDBAggregationByLevelIT.java     |  16 +--
 11 files changed, 217 insertions(+), 115 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
 
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
index bd26bdd..c3f3a30 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
@@ -23,7 +23,9 @@ import 
org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.factory.AggregateResultFactory;
 import org.apache.iotdb.db.utils.AggregateUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
@@ -41,7 +43,7 @@ public class AggregationPlan extends RawDataQueryPlan {
 
   private int level = -1;
   // group by level aggregation result path
-  private final Map<String, AggregateResult> groupPathsResultMap = new 
LinkedHashMap<>();
+  private final Map<String, AggregateResult> levelAggPaths = new 
LinkedHashMap<>();
 
   public AggregationPlan() {
     super();
@@ -77,38 +79,28 @@ public class AggregationPlan extends RawDataQueryPlan {
     this.level = level;
   }
 
-  public Map<String, AggregateResult> getGroupPathsResultMap() {
-    return groupPathsResultMap;
-  }
-
-  public Map<String, AggregateResult> 
groupAggResultByLevel(List<AggregateResult> aggregateResults)
-      throws QueryProcessException {
-    if (!groupPathsResultMap.isEmpty()) {
-      groupPathsResultMap.clear();
+  public Map<String, AggregateResult> getAggPathByLevel() throws 
QueryProcessException {
+    if (!levelAggPaths.isEmpty()) {
+      return levelAggPaths;
     }
+    List<PartialPath> seriesPaths = getPaths();
+    List<TSDataType> dataTypes = getDataTypes();
     try {
-      for (int i = 0; i < paths.size(); i++) {
+      for (int i = 0; i < seriesPaths.size(); i++) {
         String transformedPath =
-            AggregateUtils.generatePartialPathByLevel(
-                getDeduplicatedPaths().get(i).getFullPath(), getLevel());
-        String key = deduplicatedAggregations.get(i) + "(" + transformedPath + 
")";
-        AggregateResult result = groupPathsResultMap.get(key);
-        if (result == null) {
-          groupPathsResultMap.put(key, aggregateResults.get(i));
-        } else {
-          result.merge(aggregateResults.get(i));
-          groupPathsResultMap.put(key, result);
+            
AggregateUtils.generatePartialPathByLevel(seriesPaths.get(i).getFullPath(), 
getLevel());
+        String key = getAggregations().get(i) + "(" + transformedPath + ")";
+        if (!levelAggPaths.containsKey(key)) {
+          AggregateResult aggRet =
+              AggregateResultFactory.getAggrResultByName(
+                  getAggregations().get(i), dataTypes.get(i));
+          levelAggPaths.put(key, aggRet);
         }
       }
     } catch (IllegalPathException e) {
       throw new QueryProcessException(e.getMessage());
     }
-    return groupPathsResultMap;
-  }
-
-  @Override
-  public boolean isGroupByLevel() {
-    return level >= 0;
+    return levelAggPaths;
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java 
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
index 3e590e4..30c2fcd 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
@@ -115,10 +115,6 @@ public abstract class QueryPlan extends PhysicalPlan {
     pathToIndex.put(columnName, index);
   }
 
-  public boolean isGroupByLevel() {
-    return false;
-  }
-
   public void setPathToIndex(Map<String, Integer> pathToIndex) {
     this.pathToIndex = pathToIndex;
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
index 690f4a6..7194633 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.db.query.dataset.groupby;
 
 import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
-import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.SessionManager;
 import org.apache.iotdb.db.utils.TestOnly;
@@ -51,7 +50,6 @@ public abstract class GroupByEngineDataSet extends 
QueryDataSet {
   private boolean isSlidingStepByMonth = false;
   protected int intervalTimes;
   private static final long MS_TO_MONTH = 30 * 86400_000L;
-  protected AggregateResult[] curAggregateResults;
 
   public GroupByEngineDataSet() {}
 
@@ -175,10 +173,6 @@ public abstract class GroupByEngineDataSet extends 
QueryDataSet {
     return startTime;
   }
 
-  public AggregateResult[] getCurAggregateResults() {
-    return curAggregateResults;
-  }
-
   @TestOnly
   public Pair<Long, Long> nextTimePartition() {
     hasCachedTimeInterval = false;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
index 5780e88..2416387 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.utils.AggregateUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
@@ -33,7 +35,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
@@ -44,49 +45,50 @@ public class GroupByTimeDataSet extends QueryDataSet {
   private List<RowRecord> records = new ArrayList<>();
   private int index = 0;
 
+  protected long queryId;
   private GroupByTimePlan groupByTimePlan;
-  private final QueryContext context;
+  private QueryContext context;
 
   public GroupByTimeDataSet(
       QueryContext context, GroupByTimePlan plan, GroupByEngineDataSet dataSet)
       throws QueryProcessException, IOException {
-    this.context = context;
+    this.queryId = context.getQueryId();
     this.paths = new ArrayList<>(plan.getDeduplicatedPaths());
     this.dataTypes = plan.getDeduplicatedDataTypes();
     this.groupByTimePlan = plan;
+    this.context = context;
 
     if (logger.isDebugEnabled()) {
       logger.debug("paths " + this.paths + " level:" + plan.getLevel());
     }
 
+    Map<String, AggregateResult> finalPaths = plan.getAggPathByLevel();
+
     // get all records from GroupByDataSet, then we merge every record
     if (logger.isDebugEnabled()) {
       logger.debug("only group by level, paths:" + groupByTimePlan.getPaths());
     }
-
-    this.paths = new ArrayList<>();
-    this.dataTypes = new ArrayList<>();
-    Map<String, AggregateResult> groupPathResultMap;
     while (dataSet != null && dataSet.hasNextWithoutConstraint()) {
       RowRecord rawRecord = dataSet.nextWithoutConstraint();
       RowRecord curRecord = new RowRecord(rawRecord.getTimestamp());
-      groupPathResultMap =
-          
plan.groupAggResultByLevel(Arrays.asList(dataSet.getCurAggregateResults()));
-      for (AggregateResult resultData : groupPathResultMap.values()) {
-        curRecord.addField(resultData.getResult(), 
resultData.getResultDataType());
+      List<AggregateResult> mergedAggResults =
+          AggregateUtils.mergeRecordByPath(plan, rawRecord, finalPaths);
+      for (AggregateResult resultData : mergedAggResults) {
+        TSDataType dataType = resultData.getResultDataType();
+        curRecord.addField(resultData.getResult(), dataType);
       }
       records.add(curRecord);
+    }
 
-      if (paths.isEmpty()) {
-        for (Map.Entry<String, AggregateResult> entry : 
groupPathResultMap.entrySet()) {
-          try {
-            this.paths.add(new PartialPath(entry.getKey()));
-          } catch (IllegalPathException e) {
-            logger.error("Query result IllegalPathException occurred: {}.", 
entry.getKey());
-          }
-          this.dataTypes.add(entry.getValue().getResultDataType());
-        }
+    this.dataTypes = new ArrayList<>();
+    this.paths = new ArrayList<>();
+    for (Map.Entry<String, AggregateResult> entry : finalPaths.entrySet()) {
+      try {
+        this.paths.add(new PartialPath(entry.getKey()));
+      } catch (IllegalPathException e) {
+        logger.error("Query result IllegalPathException occurred: {}.", 
entry.getKey());
       }
+      this.dataTypes.add(entry.getValue().getResultDataType());
     }
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
index 2d136da..e3e8a2b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
@@ -128,13 +128,13 @@ public class GroupByWithValueFilterDataSet extends 
GroupByEngineDataSet {
           "need to call hasNext() before calling next()" + " in 
GroupByWithoutValueFilterDataSet.");
     }
     hasCachedTimeInterval = false;
-    curAggregateResults = new AggregateResult[paths.size()];
+    List<AggregateResult> aggregateResultList = new ArrayList<>();
     for (int i = 0; i < paths.size(); i++) {
-      curAggregateResults[i] =
+      aggregateResultList.add(
           AggregateResultFactory.getAggrResultByName(
               groupByTimePlan.getDeduplicatedAggregations().get(i),
               groupByTimePlan.getDeduplicatedDataTypes().get(i),
-              ascending);
+              ascending));
     }
 
     long[] timestampArray = new long[timeStampFetchSize];
@@ -145,14 +145,14 @@ public class GroupByWithValueFilterDataSet extends 
GroupByEngineDataSet {
       if (timestamp < curEndTime) {
         if (!groupByTimePlan.isAscending() && timestamp < curStartTime) {
           cachedTimestamps.addFirst(timestamp);
-          return constructRowRecord(curAggregateResults);
+          return constructRowRecord(aggregateResultList);
         }
         if (timestamp >= curStartTime) {
           timestampArray[timeArrayLength++] = timestamp;
         }
       } else {
         cachedTimestamps.addFirst(timestamp);
-        return constructRowRecord(curAggregateResults);
+        return constructRowRecord(aggregateResultList);
       }
     }
 
@@ -162,8 +162,9 @@ public class GroupByWithValueFilterDataSet extends 
GroupByEngineDataSet {
 
       // cal result using timestamp array
       for (int i = 0; i < paths.size(); i++) {
-        curAggregateResults[i].updateResultUsingTimestamps(
-            timestampArray, timeArrayLength, allDataReaderList.get(i));
+        aggregateResultList
+            .get(i)
+            .updateResultUsingTimestamps(timestampArray, timeArrayLength, 
allDataReaderList.get(i));
       }
 
       timeArrayLength = 0;
@@ -177,11 +178,12 @@ public class GroupByWithValueFilterDataSet extends 
GroupByEngineDataSet {
     if (timeArrayLength > 0) {
       // cal result using timestamp array
       for (int i = 0; i < paths.size(); i++) {
-        curAggregateResults[i].updateResultUsingTimestamps(
-            timestampArray, timeArrayLength, allDataReaderList.get(i));
+        aggregateResultList
+            .get(i)
+            .updateResultUsingTimestamps(timestampArray, timeArrayLength, 
allDataReaderList.get(i));
       }
     }
-    return constructRowRecord(curAggregateResults);
+    return constructRowRecord(aggregateResultList);
   }
 
   @Override
@@ -266,7 +268,7 @@ public class GroupByWithValueFilterDataSet extends 
GroupByEngineDataSet {
     return timeArrayLength;
   }
 
-  private RowRecord constructRowRecord(AggregateResult[] aggregateResultList) {
+  private RowRecord constructRowRecord(List<AggregateResult> 
aggregateResultList) {
     RowRecord record;
     if (leftCRightO) {
       record = new RowRecord(curStartTime);
@@ -274,7 +276,7 @@ public class GroupByWithValueFilterDataSet extends 
GroupByEngineDataSet {
       record = new RowRecord(curEndTime - 1);
     }
     for (int i = 0; i < paths.size(); i++) {
-      AggregateResult aggregateResult = aggregateResultList[i];
+      AggregateResult aggregateResult = aggregateResultList.get(i);
       record.addField(aggregateResult.getResult(), 
aggregateResult.getResultDataType());
     }
     return record;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
index 29a9948..0c2025f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
@@ -135,33 +135,30 @@ public class GroupByWithoutValueFilterDataSet extends 
GroupByEngineDataSet {
       record = new RowRecord(curEndTime - 1);
     }
 
-    curAggregateResults = getNextAggregateResult();
-    for (AggregateResult res : curAggregateResults) {
-      if (res == null) {
-        record.addField(null);
-        continue;
-      }
-      record.addField(res.getResult(), res.getResultDataType());
-    }
-    return record;
-  }
+    AggregateResult[] fields = new AggregateResult[paths.size()];
 
-  private AggregateResult[] getNextAggregateResult() throws IOException {
-    curAggregateResults = new AggregateResult[paths.size()];
     try {
       for (Entry<PartialPath, GroupByExecutor> pathToExecutorEntry : 
pathExecutors.entrySet()) {
         GroupByExecutor executor = pathToExecutorEntry.getValue();
         List<AggregateResult> aggregations = executor.calcResult(curStartTime, 
curEndTime);
         for (int i = 0; i < aggregations.size(); i++) {
           int resultIndex = 
resultIndexes.get(pathToExecutorEntry.getKey()).get(i);
-          curAggregateResults[resultIndex] = aggregations.get(i);
+          fields[resultIndex] = aggregations.get(i);
         }
       }
     } catch (QueryProcessException e) {
       logger.error("GroupByWithoutValueFilterDataSet execute has error", e);
       throw new IOException(e.getMessage(), e);
     }
-    return curAggregateResults;
+
+    for (AggregateResult res : fields) {
+      if (res == null) {
+        record.addField(null);
+        continue;
+      }
+      record.addField(res.getResult(), res.getResultDataType());
+    }
+    return record;
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index 1e8990c..80031c8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -43,6 +43,7 @@ import 
org.apache.iotdb.db.query.reader.series.SeriesAggregateReader;
 import org.apache.iotdb.db.query.reader.series.SeriesReaderByTimestamp;
 import org.apache.iotdb.db.query.reader.series.VectorSeriesAggregateReader;
 import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator;
+import org.apache.iotdb.db.utils.AggregateUtils;
 import org.apache.iotdb.db.utils.QueryUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
@@ -694,28 +695,36 @@ public class AggregationExecutor {
   private QueryDataSet constructDataSet(
       List<AggregateResult> aggregateResultList, AggregationPlan plan)
       throws QueryProcessException {
-    SingleDataSet dataSet;
     RowRecord record = new RowRecord(0);
+    for (AggregateResult resultData : aggregateResultList) {
+      TSDataType dataType = resultData.getResultDataType();
+      record.addField(resultData.getResult(), dataType);
+    }
 
+    SingleDataSet dataSet;
     if (plan.getLevel() >= 0) {
-      Map<String, AggregateResult> groupPathsResultMap =
-          plan.groupAggResultByLevel(aggregateResultList);
+      Map<String, AggregateResult> finalPaths = plan.getAggPathByLevel();
+
+      List<AggregateResult> mergedAggResults =
+          AggregateUtils.mergeRecordByPath(plan, aggregateResultList, 
finalPaths);
 
       List<PartialPath> paths = new ArrayList<>();
       List<TSDataType> dataTypes = new ArrayList<>();
-      for (AggregateResult resultData : groupPathsResultMap.values()) {
-        dataTypes.add(resultData.getResultDataType());
-        record.addField(resultData.getResult(), 
resultData.getResultDataType());
+      for (int i = 0; i < mergedAggResults.size(); i++) {
+        dataTypes.add(mergedAggResults.get(i).getResultDataType());
       }
-      dataSet = new SingleDataSet(paths, dataTypes);
-    } else {
-      for (AggregateResult resultData : aggregateResultList) {
+      RowRecord curRecord = new RowRecord(0);
+      for (AggregateResult resultData : mergedAggResults) {
         TSDataType dataType = resultData.getResultDataType();
-        record.addField(resultData.getResult(), dataType);
+        curRecord.addField(resultData.getResult(), dataType);
       }
+
+      dataSet = new SingleDataSet(paths, dataTypes);
+      dataSet.setRecord(curRecord);
+    } else {
       dataSet = new SingleDataSet(selectedSeries, dataTypes);
+      dataSet.setRecord(record);
     }
-    dataSet.setRecord(record);
 
     return dataSet;
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java 
b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 2753527..7d3850a 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -807,8 +807,7 @@ public class TSServiceImpl implements TSIService.Iface {
 
       if (plan instanceof ShowPlan || plan instanceof AuthorPlan) {
         resp = getListDataSetHeaders(newDataSet);
-      } else if (plan instanceof UDFPlan
-          || (plan instanceof QueryPlan && ((QueryPlan) 
plan).isGroupByLevel())) {
+      } else if (plan instanceof UDFPlan) {
         resp = getQueryColumnHeaders(plan, username, isJdbcQuery);
       }
 
@@ -893,7 +892,7 @@ public class TSServiceImpl implements TSIService.Iface {
   /** get ResultSet schema */
   private TSExecuteStatementResp getQueryColumnHeaders(
       PhysicalPlan physicalPlan, String username, boolean isJdbcQuery)
-      throws AuthException, TException, MetadataException {
+      throws AuthException, TException, QueryProcessException, 
MetadataException {
 
     List<String> respColumns = new ArrayList<>();
     List<String> columnsTypes = new ArrayList<>();
@@ -917,11 +916,11 @@ public class TSServiceImpl implements TSIService.Iface {
       // because the query dataset and query id is different although the 
header of last query is
       // same.
       return StaticResps.LAST_RESP.deepCopy();
-    } else if (plan.isGroupByLevel()) {
-      for (Map.Entry<String, AggregateResult> groupPathResult :
-          ((AggregationPlan) plan).getGroupPathsResultMap().entrySet()) {
-        respColumns.add(groupPathResult.getKey());
-        
columnsTypes.add(groupPathResult.getValue().getResultDataType().toString());
+    } else if (plan instanceof AggregationPlan && ((AggregationPlan) 
plan).getLevel() >= 0) {
+      Map<String, AggregateResult> finalPaths = ((AggregationPlan) 
plan).getAggPathByLevel();
+      for (Map.Entry<String, AggregateResult> entry : finalPaths.entrySet()) {
+        respColumns.add(entry.getKey());
+        columnsTypes.add(entry.getValue().getResultDataType().toString());
       }
     } else {
       List<String> respSgColumns = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/AggregateUtils.java 
b/server/src/main/java/org/apache/iotdb/db/utils/AggregateUtils.java
index f16decd..2e12442 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/AggregateUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/AggregateUtils.java
@@ -20,8 +20,23 @@ package org.apache.iotdb.db.utils;
 
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.metadata.utils.MetaUtils;
+import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
+import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.db.query.aggregation.impl.AvgAggrResult;
+import org.apache.iotdb.db.query.factory.AggregateResultFactory;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 
 public class AggregateUtils {
 
@@ -37,20 +52,118 @@ public class AggregateUtils {
   public static String generatePartialPathByLevel(String originalPath, int 
pathLevel)
       throws IllegalPathException {
     String[] tmpPath = MetaUtils.splitPathToDetachedPath(originalPath);
-    if (pathLevel >= tmpPath.length - 1) {
+    if (tmpPath.length <= pathLevel) {
       return originalPath;
     }
     StringBuilder transformedPath = new StringBuilder();
     transformedPath.append(tmpPath[0]);
     for (int k = 1; k < tmpPath.length - 1; k++) {
-      transformedPath.append(TsFileConstant.PATH_SEPARATOR);
-      if (k == pathLevel) {
-        transformedPath.append(tmpPath[k]);
+      if (k <= pathLevel) {
+        
transformedPath.append(TsFileConstant.PATH_SEPARATOR).append(tmpPath[k]);
       } else {
-        transformedPath.append(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD);
+        transformedPath
+            .append(TsFileConstant.PATH_SEPARATOR)
+            .append(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD);
       }
     }
     
transformedPath.append(TsFileConstant.PATH_SEPARATOR).append(tmpPath[tmpPath.length
 - 1]);
     return transformedPath.toString();
   }
+
+  /**
+   * merge the raw record by level, for example raw record [timestamp, 
root.sg1.d1.s0,
+   * root.sg1.d1.s1, root.sg1.d2.s2], level=1 and newRecord data is [100, 1, 
1, 1] return [100, 3]
+   *
+   * @param newRecord
+   * @param finalPaths
+   * @return
+   */
+  public static List<AggregateResult> mergeRecordByPath(
+      AggregationPlan plan, RowRecord newRecord, Map<String, AggregateResult> 
finalPaths)
+      throws QueryProcessException {
+    if (newRecord.getFields().size() < finalPaths.size()) {
+      return Collections.emptyList();
+    }
+    List<AggregateResult> aggregateResultList = new ArrayList<>();
+    for (int i = 0; i < newRecord.getFields().size(); i++) {
+      if (newRecord.getFields().get(i) == null) {
+        aggregateResultList.add(
+            AggregateResultFactory.getAggrResultByName(
+                plan.getDeduplicatedAggregations().get(i), 
plan.getDeduplicatedDataTypes().get(i)));
+      } else {
+        TSDataType dataType = newRecord.getFields().get(i).getDataType();
+        AggregateResult aggRet =
+            AggregateResultFactory.getAggrResultByName(
+                plan.getDeduplicatedAggregations().get(i), dataType);
+        if (aggRet.getAggregationType().equals(AggregationType.AVG)) {
+          ((AvgAggrResult) aggRet)
+              .setAvgResult(dataType, 
newRecord.getFields().get(i).getDoubleV());
+        } else {
+          switch (dataType) {
+            case TEXT:
+              aggRet.setBinaryValue(newRecord.getFields().get(i).getBinaryV());
+              break;
+            case INT32:
+              aggRet.setIntValue(newRecord.getFields().get(i).getIntV());
+              break;
+            case INT64:
+              aggRet.setLongValue(newRecord.getFields().get(i).getLongV());
+              break;
+            case FLOAT:
+              aggRet.setFloatValue(newRecord.getFields().get(i).getFloatV());
+              break;
+            case DOUBLE:
+              aggRet.setDoubleValue(newRecord.getFields().get(i).getDoubleV());
+              break;
+            case BOOLEAN:
+              aggRet.setBooleanValue(newRecord.getFields().get(i).getBoolV());
+              break;
+            default:
+              throw new UnSupportedDataTypeException(dataType.toString());
+          }
+        }
+        aggregateResultList.add(aggRet);
+      }
+    }
+    return mergeRecordByPath(plan, aggregateResultList, finalPaths);
+  }
+
+  public static List<AggregateResult> mergeRecordByPath(
+      AggregationPlan plan,
+      List<AggregateResult> aggResults,
+      Map<String, AggregateResult> finalPaths)
+      throws QueryProcessException {
+    if (aggResults.size() < finalPaths.size()) {
+      return Collections.emptyList();
+    }
+    for (Map.Entry<String, AggregateResult> entry : finalPaths.entrySet()) {
+      finalPaths.put(entry.getKey(), null);
+    }
+
+    List<AggregateResult> resultSet = new ArrayList<>();
+    List<PartialPath> dupPaths = plan.getDeduplicatedPaths();
+    try {
+      for (int i = 0; i < aggResults.size(); i++) {
+        if (aggResults.get(i) != null) {
+          String transformedPath =
+              generatePartialPathByLevel(dupPaths.get(i).getFullPath(), 
plan.getLevel());
+          String key = plan.getDeduplicatedAggregations().get(i) + "(" + 
transformedPath + ")";
+          AggregateResult tempAggResult = finalPaths.get(key);
+          if (tempAggResult == null) {
+            finalPaths.put(key, aggResults.get(i));
+          } else {
+            tempAggResult.merge(aggResults.get(i));
+            finalPaths.put(key, tempAggResult);
+          }
+        }
+      }
+    } catch (IllegalPathException e) {
+      throw new QueryProcessException(e.getMessage());
+    }
+
+    for (Map.Entry<String, AggregateResult> entry : finalPaths.entrySet()) {
+      resultSet.add(entry.getValue());
+    }
+    return resultSet;
+  }
 }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBContinuousQueryIT.java
 
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBContinuousQueryIT.java
index 1b31aa7..00ead26 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBContinuousQueryIT.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBContinuousQueryIT.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.tsfile.utils.Pair;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import java.sql.Connection;
@@ -42,7 +41,6 @@ import java.util.stream.Collectors;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-@Ignore
 public class IoTDBContinuousQueryIT {
 
   private Statement statement;
diff --git 
a/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationByLevelIT.java
 
b/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationByLevelIT.java
index 30cd497..dbedae7 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationByLevelIT.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationByLevelIT.java
@@ -188,13 +188,13 @@ public class IoTDBAggregationByLevelIT {
       try (ResultSet resultSet = statement.getResultSet()) {
         while (resultSet.next()) {
           String ans =
-              resultSet.getString(TestConstant.max_time("root.*.d1.status"))
+              resultSet.getString(TestConstant.max_time("root.sg1.d1.status"))
                   + ","
-                  + 
resultSet.getString(TestConstant.max_time("root.*.d2.status"))
+                  + 
resultSet.getString(TestConstant.max_time("root.sg1.d2.status"))
                   + ","
-                  + 
resultSet.getString(TestConstant.count("root.*.d1.temperature"))
+                  + 
resultSet.getString(TestConstant.count("root.sg1.d1.temperature"))
                   + ","
-                  + 
resultSet.getString(TestConstant.count("root.*.d2.temperature"));
+                  + 
resultSet.getString(TestConstant.count("root.sg1.d2.temperature"));
           Assert.assertEquals(retArray[cnt], ans);
           cnt++;
         }
@@ -232,13 +232,13 @@ public class IoTDBAggregationByLevelIT {
       try (ResultSet resultSet = statement.getResultSet()) {
         while (resultSet.next()) {
           String ans =
-              
resultSet.getString(TestConstant.last_value("root.*.d1.temperature"))
+              
resultSet.getString(TestConstant.last_value("root.sg1.d1.temperature"))
                   + ","
-                  + 
resultSet.getString(TestConstant.last_value("root.*.d2.temperature"))
+                  + 
resultSet.getString(TestConstant.last_value("root.sg1.d2.temperature"))
                   + ","
-                  + 
resultSet.getString(TestConstant.max_value("root.*.d1.temperature"))
+                  + 
resultSet.getString(TestConstant.max_value("root.sg1.d1.temperature"))
                   + ","
-                  + 
resultSet.getString(TestConstant.max_value("root.*.d2.temperature"));
+                  + 
resultSet.getString(TestConstant.max_value("root.sg1.d2.temperature"));
           Assert.assertEquals(retArray[cnt], ans);
           cnt++;
         }

Reply via email to