This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch clusterVector in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c96ef56b8da18e3767d40265d9059982d6680868 Author: Alima777 <[email protected]> AuthorDate: Mon Oct 18 10:55:58 2021 +0800 Revert "Reimplement current function" This reverts commit 4ea643c884b2d8d4beeac3f375f45edaed3a67e1. --- .../iotdb/db/qp/physical/crud/AggregationPlan.java | 42 +++---- .../iotdb/db/qp/physical/crud/QueryPlan.java | 4 - .../dataset/groupby/GroupByEngineDataSet.java | 6 - .../query/dataset/groupby/GroupByTimeDataSet.java | 42 +++---- .../groupby/GroupByWithValueFilterDataSet.java | 26 +++-- .../groupby/GroupByWithoutValueFilterDataSet.java | 25 ++--- .../db/query/executor/AggregationExecutor.java | 31 ++++-- .../org/apache/iotdb/db/service/TSServiceImpl.java | 15 ++- .../org/apache/iotdb/db/utils/AggregateUtils.java | 123 ++++++++++++++++++++- .../db/integration/IoTDBContinuousQueryIT.java | 2 - .../aggregation/IoTDBAggregationByLevelIT.java | 16 +-- 11 files changed, 217 insertions(+), 115 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java index bd26bdd..c3f3a30 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java @@ -23,7 +23,9 @@ import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.qp.logical.Operator; import org.apache.iotdb.db.query.aggregation.AggregateResult; +import org.apache.iotdb.db.query.factory.AggregateResultFactory; import org.apache.iotdb.db.utils.AggregateUtils; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import java.util.ArrayList; import java.util.LinkedHashMap; @@ -41,7 +43,7 @@ public class AggregationPlan extends RawDataQueryPlan { private int level = -1; // group by level aggregation result path - private final Map<String, AggregateResult> groupPathsResultMap = new LinkedHashMap<>(); + private final Map<String, AggregateResult> levelAggPaths = new LinkedHashMap<>(); public AggregationPlan() { super(); @@ -77,38 +79,28 @@ public class AggregationPlan extends RawDataQueryPlan { this.level = level; } - public Map<String, AggregateResult> getGroupPathsResultMap() { - return groupPathsResultMap; - } - - public Map<String, AggregateResult> groupAggResultByLevel(List<AggregateResult> aggregateResults) - throws QueryProcessException { - if (!groupPathsResultMap.isEmpty()) { - groupPathsResultMap.clear(); + public Map<String, AggregateResult> getAggPathByLevel() throws QueryProcessException { + if (!levelAggPaths.isEmpty()) { + return levelAggPaths; } + List<PartialPath> seriesPaths = getPaths(); + List<TSDataType> dataTypes = getDataTypes(); try { - for (int i = 0; i < paths.size(); i++) { + for (int i = 0; i < seriesPaths.size(); i++) { String transformedPath = - AggregateUtils.generatePartialPathByLevel( - getDeduplicatedPaths().get(i).getFullPath(), getLevel()); - String key = deduplicatedAggregations.get(i) + "(" + transformedPath + ")"; - AggregateResult result = groupPathsResultMap.get(key); - if (result == null) { - groupPathsResultMap.put(key, aggregateResults.get(i)); - } else { - result.merge(aggregateResults.get(i)); - groupPathsResultMap.put(key, result); + AggregateUtils.generatePartialPathByLevel(seriesPaths.get(i).getFullPath(), getLevel()); + String key = getAggregations().get(i) + "(" + transformedPath + ")"; + if (!levelAggPaths.containsKey(key)) { + AggregateResult aggRet = + AggregateResultFactory.getAggrResultByName( + getAggregations().get(i), dataTypes.get(i)); + levelAggPaths.put(key, aggRet); } } } catch (IllegalPathException e) { throw new QueryProcessException(e.getMessage()); } - return groupPathsResultMap; - } - - @Override - public boolean isGroupByLevel() { - return level >= 0; + return levelAggPaths; } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java index 3e590e4..30c2fcd 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java @@ -115,10 +115,6 @@ public abstract class QueryPlan extends PhysicalPlan { pathToIndex.put(columnName, index); } - public boolean isGroupByLevel() { - return false; - } - public void setPathToIndex(Map<String, Integer> pathToIndex) { this.pathToIndex = pathToIndex; } diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java index 690f4a6..7194633 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java @@ -19,7 +19,6 @@ package org.apache.iotdb.db.query.dataset.groupby; import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan; -import org.apache.iotdb.db.query.aggregation.AggregateResult; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.SessionManager; import org.apache.iotdb.db.utils.TestOnly; @@ -51,7 +50,6 @@ public abstract class GroupByEngineDataSet extends QueryDataSet { private boolean isSlidingStepByMonth = false; protected int intervalTimes; private static final long MS_TO_MONTH = 30 * 86400_000L; - protected AggregateResult[] curAggregateResults; public GroupByEngineDataSet() {} @@ -175,10 +173,6 @@ public abstract class GroupByEngineDataSet extends QueryDataSet { return startTime; } - public AggregateResult[] getCurAggregateResults() { - return curAggregateResults; - } - @TestOnly public Pair<Long, Long> nextTimePartition() { hasCachedTimeInterval = false; diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java index 5780e88..2416387 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java @@ -25,6 +25,8 @@ import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan; import org.apache.iotdb.db.query.aggregation.AggregateResult; import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.utils.AggregateUtils; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.RowRecord; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; @@ -33,7 +35,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Map; @@ -44,49 +45,50 @@ public class GroupByTimeDataSet extends QueryDataSet { private List<RowRecord> records = new ArrayList<>(); private int index = 0; + protected long queryId; private GroupByTimePlan groupByTimePlan; - private final QueryContext context; + private QueryContext context; public GroupByTimeDataSet( QueryContext context, GroupByTimePlan plan, GroupByEngineDataSet dataSet) throws QueryProcessException, IOException { - this.context = context; + this.queryId = context.getQueryId(); this.paths = new ArrayList<>(plan.getDeduplicatedPaths()); this.dataTypes = plan.getDeduplicatedDataTypes(); this.groupByTimePlan = plan; + this.context = context; if (logger.isDebugEnabled()) { logger.debug("paths " + this.paths + " level:" + plan.getLevel()); } + Map<String, AggregateResult> finalPaths = plan.getAggPathByLevel(); + // get all records from GroupByDataSet, then we merge every record if (logger.isDebugEnabled()) { logger.debug("only group by level, paths:" + groupByTimePlan.getPaths()); } - - this.paths = new ArrayList<>(); - this.dataTypes = new ArrayList<>(); - Map<String, AggregateResult> groupPathResultMap; while (dataSet != null && dataSet.hasNextWithoutConstraint()) { RowRecord rawRecord = dataSet.nextWithoutConstraint(); RowRecord curRecord = new RowRecord(rawRecord.getTimestamp()); - groupPathResultMap = - plan.groupAggResultByLevel(Arrays.asList(dataSet.getCurAggregateResults())); - for (AggregateResult resultData : groupPathResultMap.values()) { - curRecord.addField(resultData.getResult(), resultData.getResultDataType()); + List<AggregateResult> mergedAggResults = + AggregateUtils.mergeRecordByPath(plan, rawRecord, finalPaths); + for (AggregateResult resultData : mergedAggResults) { + TSDataType dataType = resultData.getResultDataType(); + curRecord.addField(resultData.getResult(), dataType); } records.add(curRecord); + } - if (paths.isEmpty()) { - for (Map.Entry<String, AggregateResult> entry : groupPathResultMap.entrySet()) { - try { - this.paths.add(new PartialPath(entry.getKey())); - } catch (IllegalPathException e) { - logger.error("Query result IllegalPathException occurred: {}.", entry.getKey()); - } - this.dataTypes.add(entry.getValue().getResultDataType()); - } + this.dataTypes = new ArrayList<>(); + this.paths = new ArrayList<>(); + for (Map.Entry<String, AggregateResult> entry : finalPaths.entrySet()) { + try { + this.paths.add(new PartialPath(entry.getKey())); + } catch (IllegalPathException e) { + logger.error("Query result IllegalPathException occurred: {}.", entry.getKey()); } + this.dataTypes.add(entry.getValue().getResultDataType()); } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java index 2d136da..e3e8a2b 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java @@ -128,13 +128,13 @@ public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet { "need to call hasNext() before calling next()" + " in GroupByWithoutValueFilterDataSet."); } hasCachedTimeInterval = false; - curAggregateResults = new AggregateResult[paths.size()]; + List<AggregateResult> aggregateResultList = new ArrayList<>(); for (int i = 0; i < paths.size(); i++) { - curAggregateResults[i] = + aggregateResultList.add( AggregateResultFactory.getAggrResultByName( groupByTimePlan.getDeduplicatedAggregations().get(i), groupByTimePlan.getDeduplicatedDataTypes().get(i), - ascending); + ascending)); } long[] timestampArray = new long[timeStampFetchSize]; @@ -145,14 +145,14 @@ public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet { if (timestamp < curEndTime) { if (!groupByTimePlan.isAscending() && timestamp < curStartTime) { cachedTimestamps.addFirst(timestamp); - return constructRowRecord(curAggregateResults); + return constructRowRecord(aggregateResultList); } if (timestamp >= curStartTime) { timestampArray[timeArrayLength++] = timestamp; } } else { cachedTimestamps.addFirst(timestamp); - return constructRowRecord(curAggregateResults); + return constructRowRecord(aggregateResultList); } } @@ -162,8 +162,9 @@ public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet { // cal result using timestamp array for (int i = 0; i < paths.size(); i++) { - curAggregateResults[i].updateResultUsingTimestamps( - timestampArray, timeArrayLength, allDataReaderList.get(i)); + aggregateResultList + .get(i) + .updateResultUsingTimestamps(timestampArray, timeArrayLength, allDataReaderList.get(i)); } timeArrayLength = 0; @@ -177,11 +178,12 @@ public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet { if (timeArrayLength > 0) { // cal result using timestamp array for (int i = 0; i < paths.size(); i++) { - curAggregateResults[i].updateResultUsingTimestamps( - timestampArray, timeArrayLength, allDataReaderList.get(i)); + aggregateResultList + .get(i) + .updateResultUsingTimestamps(timestampArray, timeArrayLength, allDataReaderList.get(i)); } } - return constructRowRecord(curAggregateResults); + return constructRowRecord(aggregateResultList); } @Override @@ -266,7 +268,7 @@ public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet { return timeArrayLength; } - private RowRecord constructRowRecord(AggregateResult[] aggregateResultList) { + private RowRecord constructRowRecord(List<AggregateResult> aggregateResultList) { RowRecord record; if (leftCRightO) { record = new RowRecord(curStartTime); @@ -274,7 +276,7 @@ public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet { record = new RowRecord(curEndTime - 1); } for (int i = 0; i < paths.size(); i++) { - AggregateResult aggregateResult = aggregateResultList[i]; + AggregateResult aggregateResult = aggregateResultList.get(i); record.addField(aggregateResult.getResult(), aggregateResult.getResultDataType()); } return record; diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java index 29a9948..0c2025f 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java @@ -135,33 +135,30 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { record = new RowRecord(curEndTime - 1); } - curAggregateResults = getNextAggregateResult(); - for (AggregateResult res : curAggregateResults) { - if (res == null) { - record.addField(null); - continue; - } - record.addField(res.getResult(), res.getResultDataType()); - } - return record; - } + AggregateResult[] fields = new AggregateResult[paths.size()]; - private AggregateResult[] getNextAggregateResult() throws IOException { - curAggregateResults = new AggregateResult[paths.size()]; try { for (Entry<PartialPath, GroupByExecutor> pathToExecutorEntry : pathExecutors.entrySet()) { GroupByExecutor executor = pathToExecutorEntry.getValue(); List<AggregateResult> aggregations = executor.calcResult(curStartTime, curEndTime); for (int i = 0; i < aggregations.size(); i++) { int resultIndex = resultIndexes.get(pathToExecutorEntry.getKey()).get(i); - curAggregateResults[resultIndex] = aggregations.get(i); + fields[resultIndex] = aggregations.get(i); } } } catch (QueryProcessException e) { logger.error("GroupByWithoutValueFilterDataSet execute has error", e); throw new IOException(e.getMessage(), e); } - return curAggregateResults; + + for (AggregateResult res : fields) { + if (res == null) { + record.addField(null); + continue; + } + record.addField(res.getResult(), res.getResultDataType()); + } + return record; } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java index 1e8990c..80031c8 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java @@ -43,6 +43,7 @@ import org.apache.iotdb.db.query.reader.series.SeriesAggregateReader; import org.apache.iotdb.db.query.reader.series.SeriesReaderByTimestamp; import org.apache.iotdb.db.query.reader.series.VectorSeriesAggregateReader; import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator; +import org.apache.iotdb.db.utils.AggregateUtils; import org.apache.iotdb.db.utils.QueryUtils; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; @@ -694,28 +695,36 @@ public class AggregationExecutor { private QueryDataSet constructDataSet( List<AggregateResult> aggregateResultList, AggregationPlan plan) throws QueryProcessException { - SingleDataSet dataSet; RowRecord record = new RowRecord(0); + for (AggregateResult resultData : aggregateResultList) { + TSDataType dataType = resultData.getResultDataType(); + record.addField(resultData.getResult(), dataType); + } + SingleDataSet dataSet; if (plan.getLevel() >= 0) { - Map<String, AggregateResult> groupPathsResultMap = - plan.groupAggResultByLevel(aggregateResultList); + Map<String, AggregateResult> finalPaths = plan.getAggPathByLevel(); + + List<AggregateResult> mergedAggResults = + AggregateUtils.mergeRecordByPath(plan, aggregateResultList, finalPaths); List<PartialPath> paths = new ArrayList<>(); List<TSDataType> dataTypes = new ArrayList<>(); - for (AggregateResult resultData : groupPathsResultMap.values()) { - dataTypes.add(resultData.getResultDataType()); - record.addField(resultData.getResult(), resultData.getResultDataType()); + for (int i = 0; i < mergedAggResults.size(); i++) { + dataTypes.add(mergedAggResults.get(i).getResultDataType()); } - dataSet = new SingleDataSet(paths, dataTypes); - } else { - for (AggregateResult resultData : aggregateResultList) { + RowRecord curRecord = new RowRecord(0); + for (AggregateResult resultData : mergedAggResults) { TSDataType dataType = resultData.getResultDataType(); - record.addField(resultData.getResult(), dataType); + curRecord.addField(resultData.getResult(), dataType); } + + dataSet = new SingleDataSet(paths, dataTypes); + dataSet.setRecord(curRecord); + } else { dataSet = new SingleDataSet(selectedSeries, dataTypes); + dataSet.setRecord(record); } - dataSet.setRecord(record); return dataSet; } diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index 2753527..7d3850a 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@ -807,8 +807,7 @@ public class TSServiceImpl implements TSIService.Iface { if (plan instanceof ShowPlan || plan instanceof AuthorPlan) { resp = getListDataSetHeaders(newDataSet); - } else if (plan instanceof UDFPlan - || (plan instanceof QueryPlan && ((QueryPlan) plan).isGroupByLevel())) { + } else if (plan instanceof UDFPlan) { resp = getQueryColumnHeaders(plan, username, isJdbcQuery); } @@ -893,7 +892,7 @@ public class TSServiceImpl implements TSIService.Iface { /** get ResultSet schema */ private TSExecuteStatementResp getQueryColumnHeaders( PhysicalPlan physicalPlan, String username, boolean isJdbcQuery) - throws AuthException, TException, MetadataException { + throws AuthException, TException, QueryProcessException, MetadataException { List<String> respColumns = new ArrayList<>(); List<String> columnsTypes = new ArrayList<>(); @@ -917,11 +916,11 @@ public class TSServiceImpl implements TSIService.Iface { // because the query dataset and query id is different although the header of last query is // same. return StaticResps.LAST_RESP.deepCopy(); - } else if (plan.isGroupByLevel()) { - for (Map.Entry<String, AggregateResult> groupPathResult : - ((AggregationPlan) plan).getGroupPathsResultMap().entrySet()) { - respColumns.add(groupPathResult.getKey()); - columnsTypes.add(groupPathResult.getValue().getResultDataType().toString()); + } else if (plan instanceof AggregationPlan && ((AggregationPlan) plan).getLevel() >= 0) { + Map<String, AggregateResult> finalPaths = ((AggregationPlan) plan).getAggPathByLevel(); + for (Map.Entry<String, AggregateResult> entry : finalPaths.entrySet()) { + respColumns.add(entry.getKey()); + columnsTypes.add(entry.getValue().getResultDataType().toString()); } } else { List<String> respSgColumns = new ArrayList<>(); diff --git a/server/src/main/java/org/apache/iotdb/db/utils/AggregateUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/AggregateUtils.java index f16decd..2e12442 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/AggregateUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/AggregateUtils.java @@ -20,8 +20,23 @@ package org.apache.iotdb.db.utils; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.exception.metadata.IllegalPathException; +import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.metadata.utils.MetaUtils; +import org.apache.iotdb.db.qp.physical.crud.AggregationPlan; +import org.apache.iotdb.db.query.aggregation.AggregateResult; +import org.apache.iotdb.db.query.aggregation.AggregationType; +import org.apache.iotdb.db.query.aggregation.impl.AvgAggrResult; +import org.apache.iotdb.db.query.factory.AggregateResultFactory; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; +import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.RowRecord; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; public class AggregateUtils { @@ -37,20 +52,118 @@ public class AggregateUtils { public static String generatePartialPathByLevel(String originalPath, int pathLevel) throws IllegalPathException { String[] tmpPath = MetaUtils.splitPathToDetachedPath(originalPath); - if (pathLevel >= tmpPath.length - 1) { + if (tmpPath.length <= pathLevel) { return originalPath; } StringBuilder transformedPath = new StringBuilder(); transformedPath.append(tmpPath[0]); for (int k = 1; k < tmpPath.length - 1; k++) { - transformedPath.append(TsFileConstant.PATH_SEPARATOR); - if (k == pathLevel) { - transformedPath.append(tmpPath[k]); + if (k <= pathLevel) { + transformedPath.append(TsFileConstant.PATH_SEPARATOR).append(tmpPath[k]); } else { - transformedPath.append(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD); + transformedPath + .append(TsFileConstant.PATH_SEPARATOR) + .append(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD); } } transformedPath.append(TsFileConstant.PATH_SEPARATOR).append(tmpPath[tmpPath.length - 1]); return transformedPath.toString(); } + + /** + * merge the raw record by level, for example raw record [timestamp, root.sg1.d1.s0, + * root.sg1.d1.s1, root.sg1.d2.s2], level=1 and newRecord data is [100, 1, 1, 1] return [100, 3] + * + * @param newRecord + * @param finalPaths + * @return + */ + public static List<AggregateResult> mergeRecordByPath( + AggregationPlan plan, RowRecord newRecord, Map<String, AggregateResult> finalPaths) + throws QueryProcessException { + if (newRecord.getFields().size() < finalPaths.size()) { + return Collections.emptyList(); + } + List<AggregateResult> aggregateResultList = new ArrayList<>(); + for (int i = 0; i < newRecord.getFields().size(); i++) { + if (newRecord.getFields().get(i) == null) { + aggregateResultList.add( + AggregateResultFactory.getAggrResultByName( + plan.getDeduplicatedAggregations().get(i), plan.getDeduplicatedDataTypes().get(i))); + } else { + TSDataType dataType = newRecord.getFields().get(i).getDataType(); + AggregateResult aggRet = + AggregateResultFactory.getAggrResultByName( + plan.getDeduplicatedAggregations().get(i), dataType); + if (aggRet.getAggregationType().equals(AggregationType.AVG)) { + ((AvgAggrResult) aggRet) + .setAvgResult(dataType, newRecord.getFields().get(i).getDoubleV()); + } else { + switch (dataType) { + case TEXT: + aggRet.setBinaryValue(newRecord.getFields().get(i).getBinaryV()); + break; + case INT32: + aggRet.setIntValue(newRecord.getFields().get(i).getIntV()); + break; + case INT64: + aggRet.setLongValue(newRecord.getFields().get(i).getLongV()); + break; + case FLOAT: + aggRet.setFloatValue(newRecord.getFields().get(i).getFloatV()); + break; + case DOUBLE: + aggRet.setDoubleValue(newRecord.getFields().get(i).getDoubleV()); + break; + case BOOLEAN: + aggRet.setBooleanValue(newRecord.getFields().get(i).getBoolV()); + break; + default: + throw new UnSupportedDataTypeException(dataType.toString()); + } + } + aggregateResultList.add(aggRet); + } + } + return mergeRecordByPath(plan, aggregateResultList, finalPaths); + } + + public static List<AggregateResult> mergeRecordByPath( + AggregationPlan plan, + List<AggregateResult> aggResults, + Map<String, AggregateResult> finalPaths) + throws QueryProcessException { + if (aggResults.size() < finalPaths.size()) { + return Collections.emptyList(); + } + for (Map.Entry<String, AggregateResult> entry : finalPaths.entrySet()) { + finalPaths.put(entry.getKey(), null); + } + + List<AggregateResult> resultSet = new ArrayList<>(); + List<PartialPath> dupPaths = plan.getDeduplicatedPaths(); + try { + for (int i = 0; i < aggResults.size(); i++) { + if (aggResults.get(i) != null) { + String transformedPath = + generatePartialPathByLevel(dupPaths.get(i).getFullPath(), plan.getLevel()); + String key = plan.getDeduplicatedAggregations().get(i) + "(" + transformedPath + ")"; + AggregateResult tempAggResult = finalPaths.get(key); + if (tempAggResult == null) { + finalPaths.put(key, aggResults.get(i)); + } else { + tempAggResult.merge(aggResults.get(i)); + finalPaths.put(key, tempAggResult); + } + } + } + } catch (IllegalPathException e) { + throw new QueryProcessException(e.getMessage()); + } + + for (Map.Entry<String, AggregateResult> entry : finalPaths.entrySet()) { + resultSet.add(entry.getValue()); + } + return resultSet; + } } diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBContinuousQueryIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBContinuousQueryIT.java index 1b31aa7..00ead26 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBContinuousQueryIT.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBContinuousQueryIT.java @@ -26,7 +26,6 @@ import org.apache.iotdb.tsfile.utils.Pair; import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import java.sql.Connection; @@ -42,7 +41,6 @@ import java.util.stream.Collectors; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -@Ignore public class IoTDBContinuousQueryIT { private Statement statement; diff --git a/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationByLevelIT.java b/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationByLevelIT.java index 30cd497..dbedae7 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationByLevelIT.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationByLevelIT.java @@ -188,13 +188,13 @@ public class IoTDBAggregationByLevelIT { try (ResultSet resultSet = statement.getResultSet()) { while (resultSet.next()) { String ans = - resultSet.getString(TestConstant.max_time("root.*.d1.status")) + resultSet.getString(TestConstant.max_time("root.sg1.d1.status")) + "," - + resultSet.getString(TestConstant.max_time("root.*.d2.status")) + + resultSet.getString(TestConstant.max_time("root.sg1.d2.status")) + "," - + resultSet.getString(TestConstant.count("root.*.d1.temperature")) + + resultSet.getString(TestConstant.count("root.sg1.d1.temperature")) + "," - + resultSet.getString(TestConstant.count("root.*.d2.temperature")); + + resultSet.getString(TestConstant.count("root.sg1.d2.temperature")); Assert.assertEquals(retArray[cnt], ans); cnt++; } @@ -232,13 +232,13 @@ public class IoTDBAggregationByLevelIT { try (ResultSet resultSet = statement.getResultSet()) { while (resultSet.next()) { String ans = - resultSet.getString(TestConstant.last_value("root.*.d1.temperature")) + resultSet.getString(TestConstant.last_value("root.sg1.d1.temperature")) + "," - + resultSet.getString(TestConstant.last_value("root.*.d2.temperature")) + + resultSet.getString(TestConstant.last_value("root.sg1.d2.temperature")) + "," - + resultSet.getString(TestConstant.max_value("root.*.d1.temperature")) + + resultSet.getString(TestConstant.max_value("root.sg1.d1.temperature")) + "," - + resultSet.getString(TestConstant.max_value("root.*.d2.temperature")); + + resultSet.getString(TestConstant.max_value("root.sg1.d2.temperature")); Assert.assertEquals(retArray[cnt], ans); cnt++; }
