This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch aggrVector2 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 58ca9b22570c6f0f7e235616dc3ba7f1da6b1853 Author: Alima777 <[email protected]> AuthorDate: Thu Sep 16 17:06:52 2021 +0800 implement update by batchdata --- .../db/query/executor/AggregationExecutor.java | 81 +++++++++++++--------- .../apache/iotdb/tsfile/read/common/BatchData.java | 72 +++++++++++++++++++ 2 files changed, 121 insertions(+), 32 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java index dd5cd75..608d294 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java @@ -434,10 +434,7 @@ public class AggregationExecutor { remainingToCalculate = aggregateVectorPages( - seriesReader, - aggregateResultList.get(seriesReader.getCurIndex()), - isCalculatedArray.get(seriesReader.getCurIndex()), - remainingToCalculate); + seriesReader, aggregateResultList, isCalculatedArray, remainingToCalculate); if (remainingToCalculate == 0) { return; } @@ -490,19 +487,11 @@ public class AggregationExecutor { continue; } BatchData nextOverlappedPageData = seriesReader.nextPage(); - for (int i = 0; i < aggregateResultList.size(); i++) { - if (!isCalculatedArray[i]) { - AggregateResult aggregateResult = aggregateResultList.get(i); - aggregateResult.updateResultFromPageData(nextOverlappedPageData); - nextOverlappedPageData.resetBatchData(); - if (aggregateResult.hasFinalResult()) { - isCalculatedArray[i] = true; - remainingToCalculate--; - if (remainingToCalculate == 0) { - return 0; - } - } - } + remainingToCalculate = + aggregateBatchData( + aggregateResultList, isCalculatedArray, remainingToCalculate, nextOverlappedPageData); + if (remainingToCalculate == 0) { + return 0; } } return remainingToCalculate; @@ -510,8 +499,8 @@ public class AggregationExecutor { private static int aggregateVectorPages( VectorSeriesAggregateReader seriesReader, - List<AggregateResult> aggregateResultList, - boolean[] isCalculatedArray, + List<List<AggregateResult>> aggregateResultList, + List<boolean[]> isCalculatedArray, int remainingToCalculate) throws IOException, QueryProcessException { while (seriesReader.hasNextPage()) { @@ -521,7 +510,10 @@ public class AggregationExecutor { Statistics pageStatistic = seriesReader.currentPageStatistics(); remainingToCalculate = aggregateStatistics( - aggregateResultList, isCalculatedArray, remainingToCalculate, pageStatistic); + aggregateResultList.get(seriesReader.getCurIndex()), + isCalculatedArray.get(seriesReader.getCurIndex()), + remainingToCalculate, + pageStatistic); if (remainingToCalculate == 0) { seriesReader.resetIndex(); return 0; @@ -531,25 +523,50 @@ public class AggregationExecutor { seriesReader.skipCurrentPage(); continue; } + BatchData nextOverlappedPageData = seriesReader.nextPage(); - for (int i = 0; i < aggregateResultList.size(); i++) { - if (!isCalculatedArray[i]) { - AggregateResult aggregateResult = aggregateResultList.get(i); - aggregateResult.updateResultFromPageData(nextOverlappedPageData); - nextOverlappedPageData.resetBatchData(); - if (aggregateResult.hasFinalResult()) { - isCalculatedArray[i] = true; - remainingToCalculate--; - if (remainingToCalculate == 0) { - return 0; - } - } + BatchData[] subBatchData = nextOverlappedPageData.generateSubBatchData(); + while (seriesReader.getCurIndex() < seriesReader.getSubSensorSize()) { + remainingToCalculate = + aggregateBatchData( + aggregateResultList.get(seriesReader.getCurIndex()), + isCalculatedArray.get(seriesReader.getCurIndex()), + remainingToCalculate, + subBatchData[seriesReader.getCurIndex()]); + if (remainingToCalculate == 0) { + seriesReader.resetIndex(); + return 0; } + seriesReader.nextIndex(); } } return remainingToCalculate; } + private static int aggregateBatchData( + List<AggregateResult> aggregateResultList, + boolean[] isCalculatedArray, + int remainingToCalculate, + BatchData batchData) + throws QueryProcessException, IOException { + int newRemainingToCalculate = remainingToCalculate; + for (int i = 0; i < aggregateResultList.size(); i++) { + if (!isCalculatedArray[i]) { + AggregateResult aggregateResult = aggregateResultList.get(i); + aggregateResult.updateResultFromPageData(batchData); + batchData.resetBatchData(); + if (aggregateResult.hasFinalResult()) { + isCalculatedArray[i] = true; + remainingToCalculate--; + if (remainingToCalculate == 0) { + return newRemainingToCalculate; + } + } + } + } + return newRemainingToCalculate; + } + /** execute aggregate function with value filter. */ public QueryDataSet executeWithValueFilter(AggregationPlan queryPlan) throws StorageEngineException, IOException, QueryProcessException { diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java index 1005f34..3f3acf3 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java @@ -33,6 +33,7 @@ import org.apache.iotdb.tsfile.utils.TsPrimitiveType.TsInt; import org.apache.iotdb.tsfile.utils.TsPrimitiveType.TsLong; import org.apache.iotdb.tsfile.utils.TsPrimitiveType.TsVector; +import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -170,6 +171,10 @@ public class BatchData { return dataType; } + public void setDataType(TSDataType dataType) { + this.dataType = dataType; + } + public BatchDataType getBatchDataType() { return batchDataType; } @@ -658,6 +663,73 @@ public class BatchData { return readCurArrayIndex; } + public BatchData[] generateSubBatchData() throws IOException { + if (this.vectorRet == null) { + throw new IOException("SubBatchData can only be generated from VectorBatchData"); + } + int subSensorSize = this.vectorRet.get(0)[0].length; + BatchData[] subBatchData; + switch (batchDataType) { + case Ordinary: + subBatchData = new BatchData[subSensorSize]; + break; + case DescRead: + subBatchData = new DescReadBatchData[subSensorSize]; + break; + case DescReadWrite: + subBatchData = new DescReadWriteBatchData[subSensorSize]; + break; + default: + throw new UnSupportedDataTypeException( + String.format("BatchData type %s is not supported.", batchDataType)); + } + + // set data type for each subBatchData + TsPrimitiveType[] firstValues = getVector(); + for (int i = 0; i < subSensorSize; i++) { + TsPrimitiveType primitiveVal = firstValues[i]; + subBatchData[i].setDataType(primitiveVal.getDataType()); + } + + while (hasCurrent()) { + long currentTime = currentTime(); + TsPrimitiveType[] currentValues = getVector(); + for (int i = 0; i < currentValues.length; i++) { + TsPrimitiveType primitiveVal = currentValues[i]; + switch (primitiveVal.getDataType()) { + case INT32: + subBatchData[i].putInt(currentTime, primitiveVal.getInt()); + break; + case INT64: + subBatchData[i].putLong(currentTime, primitiveVal.getLong()); + break; + case FLOAT: + subBatchData[i].putFloat(currentTime, primitiveVal.getFloat()); + break; + case DOUBLE: + subBatchData[i].putDouble(currentTime, primitiveVal.getDouble()); + break; + case BOOLEAN: + subBatchData[i].putBoolean(currentTime, primitiveVal.getBoolean()); + break; + case TEXT: + subBatchData[i].putBinary(currentTime, primitiveVal.getBinary()); + break; + default: + throw new UnSupportedDataTypeException( + String.format("Data type %s is not supported.", primitiveVal.getDataType())); + } + } + next(); + } + + resetBatchData(); + for (int i = 0; i < subSensorSize; i++) { + subBatchData[i].flip(); + } + return subBatchData; + } + /** * When put data, the writeIndex increases while the readIndex remains 0. For ascending read, we * could read from 0 to writeIndex. So no need to flip.
