This is an automated email from the ASF dual-hosted git repository.

leirui pushed a commit to branch research/M4-visualization
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e51c3e1f554f0b685e37b808b7b06bd4da1d6e76
Author: Lei Rui <[email protected]>
AuthorDate: Sat Jan 28 12:58:03 2023 +0800

    add check
---
 .../dataset/groupby/LocalGroupByExecutor4CPV.java  | 91 +++++++++++-----------
 .../org/apache/iotdb/db/utils/FileLoaderUtils.java | 48 +++++++++++-
 .../iotdb/tsfile/read/common/ChunkSuit4CPV.java    | 19 ++---
 3 files changed, 101 insertions(+), 57 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java
 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java
index 6d96897bfe..1dce4eb0f3 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java
@@ -35,7 +35,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.ChunkSuit4CPV;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import org.apache.iotdb.tsfile.read.reader.IPageReader;
 import org.apache.iotdb.tsfile.read.reader.page.PageReader;
 import org.apache.iotdb.tsfile.utils.Pair;
 
@@ -169,30 +168,23 @@ public class LocalGroupByExecutor4CPV implements 
GroupByExecutor {
         itr.remove();
         // B: loads chunk data from disk to memory
         // C: decompress page data, split time&value buffers
-        List<IPageReader> pageReaderList =
-            
FileLoaderUtils.loadPageReaderList(chunkSuit4CPV.getChunkMetadata(), 
this.timeFilter);
-        //        if (pageReaderList.size() > 1) {
-        //          throw new IOException("Against the assumption that there 
is only one page in a
-        // chunk!");
-        //        }
-        //        for (IPageReader pageReader : pageReaderList) {
-        // assume only one page in a chunk
-        // assume all data on disk, no data in memory
+        PageReader pageReader =
+            FileLoaderUtils.loadPageReaderList4CPV(
+                chunkSuit4CPV.getChunkMetadata(), this.timeFilter);
         // TODO ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A 
CHUNK,
         //  BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION.
         //  OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE
-        //  STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF 
STEPREGRESS IS ASSIGN DIRECTLY),
+        //  STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF 
STEPREGRESS IS ASSIGN
+        // DIRECTLY),
         //  WHICH WILL INTRODUCE BUGS!
-        ((PageReader) pageReaderList.get(0))
-            .split4CPV(
-                startTime,
-                endTime,
-                interval,
-                curStartTime,
-                currentChunkList,
-                splitChunkList,
-                chunkMetadata);
-        //        }
+        pageReader.split4CPV(
+            startTime,
+            endTime,
+            interval,
+            curStartTime,
+            currentChunkList,
+            splitChunkList,
+            chunkMetadata);
       }
     }
   }
@@ -280,15 +272,16 @@ public class LocalGroupByExecutor4CPV implements 
GroupByExecutor {
           for (ChunkSuit4CPV chunkSuit4CPV : candidateSet) {
             // TODO 注意delete intervals的传递
             if (chunkSuit4CPV.getPageReader() == null) {
-              List<IPageReader> pageReaderList =
-                  FileLoaderUtils.loadPageReaderList(
+              PageReader pageReader =
+                  FileLoaderUtils.loadPageReaderList4CPV(
                       chunkSuit4CPV.getChunkMetadata(), this.timeFilter);
               // TODO ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE 
PAGE IN A CHUNK,
               //  BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION.
               //  OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK 
WHILE
-              //  STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF 
STEPREGRESS IS ASSIGN DIRECTLY),
+              //  STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF 
STEPREGRESS IS ASSIGN
+              // DIRECTLY),
               //  WHICH WILL INTRODUCE BUGS!
-              chunkSuit4CPV.setPageReader((PageReader) pageReaderList.get(0));
+              chunkSuit4CPV.setPageReader(pageReader);
             } else {
               // TODO 注意delete intervals的传递:主要是被重写点作为点删除传递
               // pageReader does not refer to the same deleteInterval as those 
in chunkMetadata
@@ -321,8 +314,7 @@ public class LocalGroupByExecutor4CPV implements 
GroupByExecutor {
         // getCurrentChunkListFromFutureChunkList
         if (candidateTimestamp < curStartTime || candidateTimestamp >= 
curStartTime + interval) {
           isDeletedItself = true;
-        }
-        else {
+        } else {
           isDeletedItself =
               PageReader.isDeleted(
                   candidateTimestamp, 
candidate.getChunkMetadata().getDeleteIntervalList());
@@ -373,15 +365,16 @@ public class LocalGroupByExecutor4CPV implements 
GroupByExecutor {
               // scan这个chunk的数据
               // TODO chunk data read operation (a): check existence of data 
point at a timestamp
               if (chunkSuit4CPV.getPageReader() == null) {
-                List<IPageReader> pageReaderList =
-                    FileLoaderUtils.loadPageReaderList(
+                PageReader pageReader =
+                    FileLoaderUtils.loadPageReaderList4CPV(
                         chunkSuit4CPV.getChunkMetadata(), this.timeFilter);
                 // TODO ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE 
PAGE IN A CHUNK,
                 //  BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS 
ASSUMPTION.
                 //  OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK 
WHILE
-                //  STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE 
OF STEPREGRESS IS ASSIGN DIRECTLY),
+                //  STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE 
OF STEPREGRESS IS
+                // ASSIGN DIRECTLY),
                 //  WHICH WILL INTRODUCE BUGS!
-                chunkSuit4CPV.setPageReader((PageReader) 
pageReaderList.get(0));
+                chunkSuit4CPV.setPageReader(pageReader);
               }
               isUpdate = chunkSuit4CPV.checkIfExist(candidateTimestamp);
               if (isUpdate) { // 
提前结束对overlaps块的scan,因为已经找到一个update点证明candidate失效
@@ -480,15 +473,16 @@ public class LocalGroupByExecutor4CPV implements 
GroupByExecutor {
           for (ChunkSuit4CPV chunkSuit4CPV : candidateSet) {
             // TODO 注意delete intervals的传递
             if (chunkSuit4CPV.getPageReader() == null) {
-              List<IPageReader> pageReaderList =
-                  FileLoaderUtils.loadPageReaderList(
+              PageReader pageReader =
+                  FileLoaderUtils.loadPageReaderList4CPV(
                       chunkSuit4CPV.getChunkMetadata(), this.timeFilter);
               // TODO ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE 
PAGE IN A CHUNK,
               //  BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION.
               //  OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK 
WHILE
-              //  STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF 
STEPREGRESS IS ASSIGN DIRECTLY),
+              //  STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF 
STEPREGRESS IS ASSIGN
+              // DIRECTLY),
               //  WHICH WILL INTRODUCE BUGS!
-              chunkSuit4CPV.setPageReader((PageReader) pageReaderList.get(0));
+              chunkSuit4CPV.setPageReader(pageReader);
             } else {
               // TODO 注意delete intervals的传递:主要是被重写点作为点删除传递
               // pageReader does not refer to the same deleteInterval as those 
in chunkMetadata
@@ -577,15 +571,16 @@ public class LocalGroupByExecutor4CPV implements 
GroupByExecutor {
               // scan这个chunk的数据
               // TODO chunk data read operation (a): check existence of data 
point at a timestamp
               if (chunkSuit4CPV.getPageReader() == null) {
-                List<IPageReader> pageReaderList =
-                    FileLoaderUtils.loadPageReaderList(
+                PageReader pageReader =
+                    FileLoaderUtils.loadPageReaderList4CPV(
                         chunkSuit4CPV.getChunkMetadata(), this.timeFilter);
                 // TODO ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE 
PAGE IN A CHUNK,
                 //  BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS 
ASSUMPTION.
                 //  OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK 
WHILE
-                //  STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE 
OF STEPREGRESS IS ASSIGN DIRECTLY),
+                //  STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE 
OF STEPREGRESS IS
+                // ASSIGN DIRECTLY),
                 //  WHICH WILL INTRODUCE BUGS!
-                chunkSuit4CPV.setPageReader((PageReader) 
pageReaderList.get(0));
+                chunkSuit4CPV.setPageReader(pageReader);
               }
               isUpdate = chunkSuit4CPV.checkIfExist(candidateTimestamp);
               if (isUpdate) { // 
提前结束对overlaps块的scan,因为已经找到一个update点证明candidate失效
@@ -663,15 +658,16 @@ public class LocalGroupByExecutor4CPV implements 
GroupByExecutor {
       if (susp_candidate.isLazyLoad()) { // 如果是lazy
         // load,则此时load、应用deletes、更新batchData和statistics,取消lazyLoad标记,然后回到循环1
         if (susp_candidate.getPageReader() == null) {
-          List<IPageReader> pageReaderList =
-              FileLoaderUtils.loadPageReaderList(
+          PageReader pageReader =
+              FileLoaderUtils.loadPageReaderList4CPV(
                   susp_candidate.getChunkMetadata(), this.timeFilter);
           // TODO ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN 
A CHUNK,
           //  BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION.
           //  OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE
-          //  STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF 
STEPREGRESS IS ASSIGN DIRECTLY),
+          //  STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF 
STEPREGRESS IS ASSIGN
+          // DIRECTLY),
           //  WHICH WILL INTRODUCE BUGS!
-          susp_candidate.setPageReader((PageReader) pageReaderList.get(0));
+          susp_candidate.setPageReader(pageReader);
         }
         // TODO update FP equal to or after statistics.getEndTime
         susp_candidate.updateFPwithTheClosetPointEqualOrAfter(
@@ -770,15 +766,16 @@ public class LocalGroupByExecutor4CPV implements 
GroupByExecutor {
       if (susp_candidate.isLazyLoad()) { // 如果是lazy
         // load,则此时load、应用deletes、更新batchData和statistics,取消lazyLoad标记,然后回到循环1
         if (susp_candidate.getPageReader() == null) {
-          List<IPageReader> pageReaderList =
-              FileLoaderUtils.loadPageReaderList(
+          PageReader pageReader =
+              FileLoaderUtils.loadPageReaderList4CPV(
                   susp_candidate.getChunkMetadata(), this.timeFilter);
           // TODO ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN 
A CHUNK,
           //  BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION.
           //  OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE
-          //  STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF 
STEPREGRESS IS ASSIGN DIRECTLY),
+          //  STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF 
STEPREGRESS IS ASSIGN
+          // DIRECTLY),
           //  WHICH WILL INTRODUCE BUGS!
-          susp_candidate.setPageReader((PageReader) pageReaderList.get(0));
+          susp_candidate.setPageReader(pageReader);
         }
         // TODO update FP equal to or after statistics.getEndTime
         susp_candidate.updateLPwithTheClosetPointEqualOrBefore(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java 
b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
index 52c7f8ff9c..7d2b9dc5cd 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.reader.IChunkReader;
 import org.apache.iotdb.tsfile.read.reader.IPageReader;
 import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.iotdb.tsfile.read.reader.page.PageReader;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -86,11 +87,11 @@ public class FileLoaderUtils {
   }
 
   /**
-   * @author Yuyuan Kang
    * @param resource TsFile
    * @param seriesPath Timeseries path
    * @param allSensors measurements queried at the same time of this device
    * @param filter any filter, only used to check time range
+   * @author Yuyuan Kang
    */
   public static TimeseriesMetadata loadTimeSeriesMetadata(
       TsFileResource resource,
@@ -191,6 +192,51 @@ public class FileLoaderUtils {
     }
   }
 
+  /**
+   * load all page readers in one chunk that satisfying the timeFilter
+   *
+   * @param chunkMetaData the corresponding chunk metadata
+   * @param timeFilter it should be a TimeFilter instead of a ValueFilter
+   */
+  public static PageReader loadPageReaderList4CPV(ChunkMetadata chunkMetaData, 
Filter timeFilter)
+      throws IOException {
+    long start = System.nanoTime();
+    if (chunkMetaData == null) {
+      throw new IOException("Can't init null chunkMeta");
+    }
+    try {
+      IChunkReader chunkReader;
+      IChunkLoader chunkLoader = chunkMetaData.getChunkLoader();
+      if (chunkLoader instanceof MemChunkLoader) {
+        MemChunkLoader memChunkLoader = (MemChunkLoader) chunkLoader;
+        chunkReader = new MemChunkReader(memChunkLoader.getChunk(), 
timeFilter);
+      } else {
+        Chunk chunk = chunkLoader.loadChunk(chunkMetaData); // loads chunk 
data from disk to memory
+        chunk.setFromOldFile(chunkMetaData.isFromOldTsFile());
+        chunkReader =
+            new ChunkReader(chunk, timeFilter); // decompress page data, split 
time&value buffers
+        chunkReader.hasNextSatisfiedPage();
+      }
+      long duration = System.nanoTime() - start;
+      IOMonitor.incDataIOTime(duration);
+      List<IPageReader> pageReaderList = chunkReader.loadPageReaderList();
+      if (pageReaderList.size() > 1) {
+        // TODO ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A 
CHUNK,
+        //  BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION.
+        //  OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE
+        //  STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF 
STEPREGRESS IS ASSIGN
+        // DIRECTLY),
+        //  WHICH WILL INTRODUCE BUGS!
+        throw new IOException("Wrong: more than one page in a chunk!");
+      }
+      return (PageReader) pageReaderList.get(0);
+    } catch (IOException e) {
+      logger.error(
+          "Something wrong happened while reading chunk from " + 
chunkMetaData.getFilePath());
+      throw e;
+    }
+  }
+
   public static List<ChunkMetadata> getChunkMetadataList(Path path, String 
filePath)
       throws IOException {
     TsFileSequenceReader tsFileReader = 
FileReaderManager.getInstance().get(filePath, true);
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java
index cb4a691325..55116a7619 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.tsfile.read.common;
 
-import java.io.IOException;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.DoubleStatistics;
@@ -31,6 +30,8 @@ import 
org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.file.metadata.statistics.StepRegress;
 import org.apache.iotdb.tsfile.read.reader.page.PageReader;
 
+import java.io.IOException;
+
 public class ChunkSuit4CPV {
 
   private ChunkMetadata chunkMetadata; // fixed info, including version, 
dataType, stepRegress
@@ -271,10 +272,10 @@ public class ChunkSuit4CPV {
     long timestamp = pageReader.timeBuffer.getLong(estimatedPos * 8);
     statistics.setStartTime(timestamp);
     switch (chunkMetadata.getDataType()) {
-      // iotdb的int类型的plain编码用的是自制的不支持random access
-      //      case INT32:
-      //        return new 
MinMaxInfo(pageReader.valueBuffer.getInt(estimatedPos * 4),
-      //            pageReader.timeBuffer.getLong(estimatedPos * 8));
+        // iotdb的int类型的plain编码用的是自制的不支持random access
+        //      case INT32:
+        //        return new 
MinMaxInfo(pageReader.valueBuffer.getInt(estimatedPos * 4),
+        //            pageReader.timeBuffer.getLong(estimatedPos * 8));
       case INT64:
         long longVal =
             pageReader.valueBuffer.getLong(pageReader.timeBufferLength + 
estimatedPos * 8);
@@ -332,10 +333,10 @@ public class ChunkSuit4CPV {
     long timestamp = pageReader.timeBuffer.getLong(estimatedPos * 8);
     statistics.setEndTime(timestamp);
     switch (chunkMetadata.getDataType()) {
-      // iotdb的int类型的plain编码用的是自制的不支持random access
-      //      case INT32:
-      //        return new 
MinMaxInfo(pageReader.valueBuffer.getInt(estimatedPos * 4),
-      //            pageReader.timeBuffer.getLong(estimatedPos * 8));
+        // iotdb的int类型的plain编码用的是自制的不支持random access
+        //      case INT32:
+        //        return new 
MinMaxInfo(pageReader.valueBuffer.getInt(estimatedPos * 4),
+        //            pageReader.timeBuffer.getLong(estimatedPos * 8));
       case INT64:
         long longVal =
             pageReader.valueBuffer.getLong(pageReader.timeBufferLength + 
estimatedPos * 8);

Reply via email to