This is an automated email from the ASF dual-hosted git repository.
leirui pushed a commit to branch research/M4-visualization
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/research/M4-visualization by
this push:
new b75e364275 fix
b75e364275 is described below
commit b75e364275d5cc4d058463ef70cd6ced64c3c188
Author: Lei Rui <[email protected]>
AuthorDate: Wed Jun 29 17:24:13 2022 +0800
fix
---
.../dataset/groupby/LocalGroupByExecutor4CPV.java | 128 ++++++++++++++++-----
1 file changed, 100 insertions(+), 28 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java
index ab1b77081d..d2aa192be6 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java
@@ -49,6 +49,7 @@ import org.apache.iotdb.tsfile.read.reader.BatchDataIterator;
import org.apache.iotdb.tsfile.read.reader.IPageReader;
import org.apache.iotdb.tsfile.read.reader.page.PageReader;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import java.io.IOException;
import java.util.ArrayList;
@@ -214,6 +215,69 @@ public class LocalGroupByExecutor4CPV implements
GroupByExecutor {
return results;
}
+ /** 对BatchData应用deletes操作,获得更新的BatchData和statistics赋值到chunkSuit4CPV中 */
+ private void updateBatchData(ChunkSuit4CPV chunkSuit4CPV, TSDataType
dataType) {
+ if (chunkSuit4CPV.getBatchData() != null) {
+ BatchData batchData1 = BatchDataFactory.createBatchData(dataType, true,
false);
+ Statistics statistics = null;
+ switch (tsDataType) {
+ case INT32:
+ statistics = new IntegerStatistics();
+ break;
+ case INT64:
+ statistics = new LongStatistics();
+ break;
+ case FLOAT:
+ statistics = new FloatStatistics();
+ break;
+ case DOUBLE:
+ statistics = new DoubleStatistics();
+ break;
+ default:
+ break;
+ }
+ BatchDataIterator batchDataIterator =
chunkSuit4CPV.getBatchData().getBatchDataIterator();
+ while (batchDataIterator.hasNextTimeValuePair()) {
+ TimeValuePair timeValuePair = batchDataIterator.nextTimeValuePair();
+ long timestamp = timeValuePair.getTimestamp();
+ TsPrimitiveType value = timeValuePair.getValue();
+ boolean isDeletedItself = false;
+ if (chunkSuit4CPV.getChunkMetadata().getDeleteIntervalList() != null) {
+ for (TimeRange timeRange :
chunkSuit4CPV.getChunkMetadata().getDeleteIntervalList()) {
+ if (timeRange.contains(timestamp)) {
+ isDeletedItself = true;
+ break;
+ }
+ }
+ }
+ if (!isDeletedItself) {
+ switch (dataType) {
+ case INT32:
+ batchData1.putInt(timestamp, value.getInt());
+ statistics.update(timestamp, value.getInt());
+ break;
+ case INT64:
+ batchData1.putLong(timestamp, value.getLong());
+ statistics.update(timestamp, value.getLong());
+ break;
+ case FLOAT:
+ batchData1.putFloat(timestamp, value.getFloat());
+ statistics.update(timestamp, value.getFloat());
+ break;
+ case DOUBLE:
+ batchData1.putDouble(timestamp, value.getDouble());
+ statistics.update(timestamp, value.getDouble());
+ break;
+ default:
+ throw new UnSupportedDataTypeException(String.valueOf(dataType));
+ }
+ }
+ }
+ chunkSuit4CPV.setBatchData(batchData1);
+ chunkSuit4CPV.getChunkMetadata().setStatistics(statistics);
+ }
+ }
+
private void calculateBottomPoint(
List<ChunkSuit4CPV> currentChunkList,
long startTime,
@@ -261,20 +325,24 @@ public class LocalGroupByExecutor4CPV implements
GroupByExecutor {
// load,则对所有块进行load,应用deleteIntervals,并把BP删掉(因为不管是被删除删掉还是被更新删掉都是删掉这个点)
if (nonLazyLoad.size() == 0) {
for (ChunkSuit4CPV chunkSuit4CPV : candidateSet) {
- currentChunkList.remove(chunkSuit4CPV); // TODO check this
- List<IPageReader> pageReaderList =
- FileLoaderUtils.loadPageReaderList(
- chunkSuit4CPV.getChunkMetadata(), this.timeFilter);
- for (IPageReader pageReader : pageReaderList) { // assume only one
page in a chunk
- ((PageReader) pageReader)
- .split4CPV(
- startTime,
- endTime,
- interval,
- curStartTime,
- currentChunkList,
- null,
- chunkSuit4CPV.getChunkMetadata());
+ if (chunkSuit4CPV.getBatchData() == null) {
+ currentChunkList.remove(chunkSuit4CPV); // TODO check this
+ List<IPageReader> pageReaderList =
+ FileLoaderUtils.loadPageReaderList(
+ chunkSuit4CPV.getChunkMetadata(), this.timeFilter);
+ for (IPageReader pageReader : pageReaderList) { // assume only
one page in a chunk
+ ((PageReader) pageReader)
+ .split4CPV(
+ startTime,
+ endTime,
+ interval,
+ curStartTime,
+ currentChunkList,
+ null,
+ chunkSuit4CPV.getChunkMetadata());
+ }
+ } else { // 已经load过,比如一开始被M4 interval分开,现在因为update而candidate失效
+ updateBatchData(chunkSuit4CPV, tsDataType);
}
}
break; // 退出循环2,进入循环1
@@ -455,20 +523,24 @@ public class LocalGroupByExecutor4CPV implements
GroupByExecutor {
// load,则对所有块进行load,应用deleteIntervals,并把TP删掉(因为不管是被删除删掉还是被更新删掉都是删掉这个点)
if (nonLazyLoad.size() == 0) {
for (ChunkSuit4CPV chunkSuit4CPV : candidateSet) {
- currentChunkList.remove(chunkSuit4CPV); // TODO check this
- List<IPageReader> pageReaderList =
- FileLoaderUtils.loadPageReaderList(
- chunkSuit4CPV.getChunkMetadata(), this.timeFilter);
- for (IPageReader pageReader : pageReaderList) { // assume only one
page in a chunk
- ((PageReader) pageReader)
- .split4CPV(
- startTime,
- endTime,
- interval,
- curStartTime,
- currentChunkList,
- null,
- chunkSuit4CPV.getChunkMetadata());
+ if (chunkSuit4CPV.getBatchData() == null) {
+ currentChunkList.remove(chunkSuit4CPV); // TODO check this
+ List<IPageReader> pageReaderList =
+ FileLoaderUtils.loadPageReaderList(
+ chunkSuit4CPV.getChunkMetadata(), this.timeFilter);
+ for (IPageReader pageReader : pageReaderList) { // assume only
one page in a chunk
+ ((PageReader) pageReader)
+ .split4CPV(
+ startTime,
+ endTime,
+ interval,
+ curStartTime,
+ currentChunkList,
+ null,
+ chunkSuit4CPV.getChunkMetadata());
+ }
+ } else { // 已经load过,比如一开始被M4 interval分开,现在因为update而candidate失效
+ updateBatchData(chunkSuit4CPV, tsDataType);
}
}
break; // 退出循环2,进入循环1