This is an automated email from the ASF dual-hosted git repository.

leirui pushed a commit to branch research/M4-visualization
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/research/M4-visualization by 
this push:
     new b75e364275 fix
b75e364275 is described below

commit b75e364275d5cc4d058463ef70cd6ced64c3c188
Author: Lei Rui <[email protected]>
AuthorDate: Wed Jun 29 17:24:13 2022 +0800

    fix
---
 .../dataset/groupby/LocalGroupByExecutor4CPV.java  | 128 ++++++++++++++++-----
 1 file changed, 100 insertions(+), 28 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java
 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java
index ab1b77081d..d2aa192be6 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java
@@ -49,6 +49,7 @@ import org.apache.iotdb.tsfile.read.reader.BatchDataIterator;
 import org.apache.iotdb.tsfile.read.reader.IPageReader;
 import org.apache.iotdb.tsfile.read.reader.page.PageReader;
 import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -214,6 +215,69 @@ public class LocalGroupByExecutor4CPV implements 
GroupByExecutor {
     return results;
   }
 
+  /** 对BatchData应用deletes操作,获得更新的BatchData和statistics赋值到chunkSuit4CPV中 */
+  private void updateBatchData(ChunkSuit4CPV chunkSuit4CPV, TSDataType 
dataType) {
+    if (chunkSuit4CPV.getBatchData() != null) {
+      BatchData batchData1 = BatchDataFactory.createBatchData(dataType, true, 
false);
+      Statistics statistics = null;
+      switch (tsDataType) {
+        case INT32:
+          statistics = new IntegerStatistics();
+          break;
+        case INT64:
+          statistics = new LongStatistics();
+          break;
+        case FLOAT:
+          statistics = new FloatStatistics();
+          break;
+        case DOUBLE:
+          statistics = new DoubleStatistics();
+          break;
+        default:
+          break;
+      }
+      BatchDataIterator batchDataIterator = 
chunkSuit4CPV.getBatchData().getBatchDataIterator();
+      while (batchDataIterator.hasNextTimeValuePair()) {
+        TimeValuePair timeValuePair = batchDataIterator.nextTimeValuePair();
+        long timestamp = timeValuePair.getTimestamp();
+        TsPrimitiveType value = timeValuePair.getValue();
+        boolean isDeletedItself = false;
+        if (chunkSuit4CPV.getChunkMetadata().getDeleteIntervalList() != null) {
+          for (TimeRange timeRange : 
chunkSuit4CPV.getChunkMetadata().getDeleteIntervalList()) {
+            if (timeRange.contains(timestamp)) {
+              isDeletedItself = true;
+              break;
+            }
+          }
+        }
+        if (!isDeletedItself) {
+          switch (dataType) {
+            case INT32:
+              batchData1.putInt(timestamp, value.getInt());
+              statistics.update(timestamp, value.getInt());
+              break;
+            case INT64:
+              batchData1.putLong(timestamp, value.getLong());
+              statistics.update(timestamp, value.getLong());
+              break;
+            case FLOAT:
+              batchData1.putFloat(timestamp, value.getFloat());
+              statistics.update(timestamp, value.getFloat());
+              break;
+            case DOUBLE:
+              batchData1.putDouble(timestamp, value.getDouble());
+              statistics.update(timestamp, value.getDouble());
+              break;
+            default:
+              throw new UnSupportedDataTypeException(String.valueOf(dataType));
+          }
+        }
+      }
+      chunkSuit4CPV.setBatchData(batchData1);
+      chunkSuit4CPV.getChunkMetadata().setStatistics(statistics);
+    }
+  }
+
   private void calculateBottomPoint(
       List<ChunkSuit4CPV> currentChunkList,
       long startTime,
@@ -261,20 +325,24 @@ public class LocalGroupByExecutor4CPV implements 
GroupByExecutor {
         // load,则对所有块进行load,应用deleteIntervals,并把BP删掉(因为不管是被删除删掉还是被更新删掉都是删掉这个点)
         if (nonLazyLoad.size() == 0) {
           for (ChunkSuit4CPV chunkSuit4CPV : candidateSet) {
-            currentChunkList.remove(chunkSuit4CPV); // TODO check this
-            List<IPageReader> pageReaderList =
-                FileLoaderUtils.loadPageReaderList(
-                    chunkSuit4CPV.getChunkMetadata(), this.timeFilter);
-            for (IPageReader pageReader : pageReaderList) { // assume only one 
page in a chunk
-              ((PageReader) pageReader)
-                  .split4CPV(
-                      startTime,
-                      endTime,
-                      interval,
-                      curStartTime,
-                      currentChunkList,
-                      null,
-                      chunkSuit4CPV.getChunkMetadata());
+            if (chunkSuit4CPV.getBatchData() == null) {
+              currentChunkList.remove(chunkSuit4CPV); // TODO check this
+              List<IPageReader> pageReaderList =
+                  FileLoaderUtils.loadPageReaderList(
+                      chunkSuit4CPV.getChunkMetadata(), this.timeFilter);
+              for (IPageReader pageReader : pageReaderList) { // assume only 
one page in a chunk
+                ((PageReader) pageReader)
+                    .split4CPV(
+                        startTime,
+                        endTime,
+                        interval,
+                        curStartTime,
+                        currentChunkList,
+                        null,
+                        chunkSuit4CPV.getChunkMetadata());
+              }
+            } else { // 已经load过,比如一开始被M4 interval分开,现在因为update而candidate失效
+              updateBatchData(chunkSuit4CPV, tsDataType);
             }
           }
           break; // 退出循环2,进入循环1
@@ -455,20 +523,24 @@ public class LocalGroupByExecutor4CPV implements 
GroupByExecutor {
         // load,则对所有块进行load,应用deleteIntervals,并把TP删掉(因为不管是被删除删掉还是被更新删掉都是删掉这个点)
         if (nonLazyLoad.size() == 0) {
           for (ChunkSuit4CPV chunkSuit4CPV : candidateSet) {
-            currentChunkList.remove(chunkSuit4CPV); // TODO check this
-            List<IPageReader> pageReaderList =
-                FileLoaderUtils.loadPageReaderList(
-                    chunkSuit4CPV.getChunkMetadata(), this.timeFilter);
-            for (IPageReader pageReader : pageReaderList) { // assume only one 
page in a chunk
-              ((PageReader) pageReader)
-                  .split4CPV(
-                      startTime,
-                      endTime,
-                      interval,
-                      curStartTime,
-                      currentChunkList,
-                      null,
-                      chunkSuit4CPV.getChunkMetadata());
+            if (chunkSuit4CPV.getBatchData() == null) {
+              currentChunkList.remove(chunkSuit4CPV); // TODO check this
+              List<IPageReader> pageReaderList =
+                  FileLoaderUtils.loadPageReaderList(
+                      chunkSuit4CPV.getChunkMetadata(), this.timeFilter);
+              for (IPageReader pageReader : pageReaderList) { // assume only 
one page in a chunk
+                ((PageReader) pageReader)
+                    .split4CPV(
+                        startTime,
+                        endTime,
+                        interval,
+                        curStartTime,
+                        currentChunkList,
+                        null,
+                        chunkSuit4CPV.getChunkMetadata());
+              }
+            } else { // 已经load过,比如一开始被M4 interval分开,现在因为update而candidate失效
+              updateBatchData(chunkSuit4CPV, tsDataType);
             }
           }
           break; // 退出循环2,进入循环1

Reply via email to