This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch aggrVector2
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 58ca9b22570c6f0f7e235616dc3ba7f1da6b1853
Author: Alima777 <[email protected]>
AuthorDate: Thu Sep 16 17:06:52 2021 +0800

    implement update by batchdata
---
 .../db/query/executor/AggregationExecutor.java     | 81 +++++++++++++---------
 .../apache/iotdb/tsfile/read/common/BatchData.java | 72 +++++++++++++++++++
 2 files changed, 121 insertions(+), 32 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index dd5cd75..608d294 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -434,10 +434,7 @@ public class AggregationExecutor {
 
         remainingToCalculate =
             aggregateVectorPages(
-                seriesReader,
-                aggregateResultList.get(seriesReader.getCurIndex()),
-                isCalculatedArray.get(seriesReader.getCurIndex()),
-                remainingToCalculate);
+                seriesReader, aggregateResultList, isCalculatedArray, 
remainingToCalculate);
         if (remainingToCalculate == 0) {
           return;
         }
@@ -490,19 +487,11 @@ public class AggregationExecutor {
         continue;
       }
       BatchData nextOverlappedPageData = seriesReader.nextPage();
-      for (int i = 0; i < aggregateResultList.size(); i++) {
-        if (!isCalculatedArray[i]) {
-          AggregateResult aggregateResult = aggregateResultList.get(i);
-          aggregateResult.updateResultFromPageData(nextOverlappedPageData);
-          nextOverlappedPageData.resetBatchData();
-          if (aggregateResult.hasFinalResult()) {
-            isCalculatedArray[i] = true;
-            remainingToCalculate--;
-            if (remainingToCalculate == 0) {
-              return 0;
-            }
-          }
-        }
+      remainingToCalculate =
+          aggregateBatchData(
+              aggregateResultList, isCalculatedArray, remainingToCalculate, 
nextOverlappedPageData);
+      if (remainingToCalculate == 0) {
+        return 0;
       }
     }
     return remainingToCalculate;
@@ -510,8 +499,8 @@ public class AggregationExecutor {
 
   private static int aggregateVectorPages(
       VectorSeriesAggregateReader seriesReader,
-      List<AggregateResult> aggregateResultList,
-      boolean[] isCalculatedArray,
+      List<List<AggregateResult>> aggregateResultList,
+      List<boolean[]> isCalculatedArray,
       int remainingToCalculate)
       throws IOException, QueryProcessException {
     while (seriesReader.hasNextPage()) {
@@ -521,7 +510,10 @@ public class AggregationExecutor {
           Statistics pageStatistic = seriesReader.currentPageStatistics();
           remainingToCalculate =
               aggregateStatistics(
-                  aggregateResultList, isCalculatedArray, 
remainingToCalculate, pageStatistic);
+                  aggregateResultList.get(seriesReader.getCurIndex()),
+                  isCalculatedArray.get(seriesReader.getCurIndex()),
+                  remainingToCalculate,
+                  pageStatistic);
           if (remainingToCalculate == 0) {
             seriesReader.resetIndex();
             return 0;
@@ -531,25 +523,50 @@ public class AggregationExecutor {
         seriesReader.skipCurrentPage();
         continue;
       }
+
       BatchData nextOverlappedPageData = seriesReader.nextPage();
-      for (int i = 0; i < aggregateResultList.size(); i++) {
-        if (!isCalculatedArray[i]) {
-          AggregateResult aggregateResult = aggregateResultList.get(i);
-          aggregateResult.updateResultFromPageData(nextOverlappedPageData);
-          nextOverlappedPageData.resetBatchData();
-          if (aggregateResult.hasFinalResult()) {
-            isCalculatedArray[i] = true;
-            remainingToCalculate--;
-            if (remainingToCalculate == 0) {
-              return 0;
-            }
-          }
+      BatchData[] subBatchData = nextOverlappedPageData.generateSubBatchData();
+      while (seriesReader.getCurIndex() < seriesReader.getSubSensorSize()) {
+        remainingToCalculate =
+            aggregateBatchData(
+                aggregateResultList.get(seriesReader.getCurIndex()),
+                isCalculatedArray.get(seriesReader.getCurIndex()),
+                remainingToCalculate,
+                subBatchData[seriesReader.getCurIndex()]);
+        if (remainingToCalculate == 0) {
+          seriesReader.resetIndex();
+          return 0;
         }
+        seriesReader.nextIndex();
       }
     }
     return remainingToCalculate;
   }
 
+  private static int aggregateBatchData(
+      List<AggregateResult> aggregateResultList,
+      boolean[] isCalculatedArray,
+      int remainingToCalculate,
+      BatchData batchData)
+      throws QueryProcessException, IOException {
+    int newRemainingToCalculate = remainingToCalculate;
+    for (int i = 0; i < aggregateResultList.size(); i++) {
+      if (!isCalculatedArray[i]) {
+        AggregateResult aggregateResult = aggregateResultList.get(i);
+        aggregateResult.updateResultFromPageData(batchData);
+        batchData.resetBatchData();
+        if (aggregateResult.hasFinalResult()) {
+          isCalculatedArray[i] = true;
+          remainingToCalculate--;
+          if (remainingToCalculate == 0) {
+            return newRemainingToCalculate;
+          }
+        }
+      }
+    }
+    return newRemainingToCalculate;
+  }
+
   /** execute aggregate function with value filter. */
   public QueryDataSet executeWithValueFilter(AggregationPlan queryPlan)
       throws StorageEngineException, IOException, QueryProcessException {
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
index 1005f34..3f3acf3 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.tsfile.utils.TsPrimitiveType.TsInt;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType.TsLong;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType.TsVector;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -170,6 +171,10 @@ public class BatchData {
     return dataType;
   }
 
+  public void setDataType(TSDataType dataType) {
+    this.dataType = dataType;
+  }
+
   public BatchDataType getBatchDataType() {
     return batchDataType;
   }
@@ -658,6 +663,73 @@ public class BatchData {
     return readCurArrayIndex;
   }
 
+  public BatchData[] generateSubBatchData() throws IOException {
+    if (this.vectorRet == null) {
+      throw new IOException("SubBatchData can only be generated from 
VectorBatchData");
+    }
+    int subSensorSize = this.vectorRet.get(0)[0].length;
+    BatchData[] subBatchData;
+    switch (batchDataType) {
+      case Ordinary:
+        subBatchData = new BatchData[subSensorSize];
+        break;
+      case DescRead:
+        subBatchData = new DescReadBatchData[subSensorSize];
+        break;
+      case DescReadWrite:
+        subBatchData = new DescReadWriteBatchData[subSensorSize];
+        break;
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("BatchData type %s is not supported.", 
batchDataType));
+    }
+
+    // set data type for each subBatchData
+    TsPrimitiveType[] firstValues = getVector();
+    for (int i = 0; i < subSensorSize; i++) {
+      TsPrimitiveType primitiveVal = firstValues[i];
+      subBatchData[i].setDataType(primitiveVal.getDataType());
+    }
+
+    while (hasCurrent()) {
+      long currentTime = currentTime();
+      TsPrimitiveType[] currentValues = getVector();
+      for (int i = 0; i < currentValues.length; i++) {
+        TsPrimitiveType primitiveVal = currentValues[i];
+        switch (primitiveVal.getDataType()) {
+          case INT32:
+            subBatchData[i].putInt(currentTime, primitiveVal.getInt());
+            break;
+          case INT64:
+            subBatchData[i].putLong(currentTime, primitiveVal.getLong());
+            break;
+          case FLOAT:
+            subBatchData[i].putFloat(currentTime, primitiveVal.getFloat());
+            break;
+          case DOUBLE:
+            subBatchData[i].putDouble(currentTime, primitiveVal.getDouble());
+            break;
+          case BOOLEAN:
+            subBatchData[i].putBoolean(currentTime, primitiveVal.getBoolean());
+            break;
+          case TEXT:
+            subBatchData[i].putBinary(currentTime, primitiveVal.getBinary());
+            break;
+          default:
+            throw new UnSupportedDataTypeException(
+                String.format("Data type %s is not supported.", 
primitiveVal.getDataType()));
+        }
+      }
+      next();
+    }
+
+    resetBatchData();
+    for (int i = 0; i < subSensorSize; i++) {
+      subBatchData[i].flip();
+    }
+    return subBatchData;
+  }
+
   /**
    * When put data, the writeIndex increases while the readIndex remains 0. 
For ascending read, we
    * could read from 0 to writeIndex. So no need to flip.

Reply via email to