This is an automated email from the ASF dual-hosted git repository. leirui pushed a commit to branch research/M4-visualization in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d8ce4d2db9117f320a92adf8352eefcd0fccb69e Author: Lei Rui <[email protected]> AuthorDate: Thu Jan 26 02:42:58 2023 +0800 query dev 70% --- .../dataset/groupby/LocalGroupByExecutor4CPV.java | 178 ++++++++++++--------- .../file/metadata/statistics/BinaryStatistics.java | 8 + .../metadata/statistics/BooleanStatistics.java | 7 + .../file/metadata/statistics/DoubleStatistics.java | 12 +- .../file/metadata/statistics/FloatStatistics.java | 12 +- .../metadata/statistics/IntegerStatistics.java | 12 +- .../file/metadata/statistics/LongStatistics.java | 111 +++++++++---- .../file/metadata/statistics/Statistics.java | 12 +- .../iotdb/tsfile/read/common/ChunkSuit4CPV.java | 123 ++++++++++---- .../tsfile/read/reader/chunk/ChunkReader.java | 1 - .../iotdb/tsfile/read/reader/page/PageReader.java | 78 +++++++-- 11 files changed, 403 insertions(+), 151 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java index 2cabc62d4e..663003b77f 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java @@ -164,15 +164,15 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { // C: decompress page data, split time&value buffers List<IPageReader> pageReaderList = FileLoaderUtils.loadPageReaderList( chunkSuit4CPV.getChunkMetadata(), this.timeFilter); - if (pageReaderList.size() > 1) { - throw new IOException("Against the assumption that there is only one page in a chunk!"); - } - for (IPageReader pageReader : pageReaderList) { - // assume only one page in a chunk - // assume all data on disk, no data in memory - ((PageReader) pageReader).split4CPV(startTime, endTime, interval, curStartTime, - currentChunkList, splitChunkList, chunkMetadata); - } +// if (pageReaderList.size() > 1) { +// throw new IOException("Against the assumption that there is only one page in a chunk!"); +// } +// for (IPageReader pageReader : pageReaderList) { + // assume only one page in a chunk + // assume all data on disk, no data in memory + ((PageReader) pageReaderList.get(0)).split4CPV(startTime, endTime, interval, curStartTime, + currentChunkList, splitChunkList, chunkMetadata); +// } } } } @@ -307,20 +307,32 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { // 如果set里所有点所在的chunk都是lazy // load,则对所有块进行load,应用deleteIntervals,并把BP删掉(因为不管是被删除删掉还是被更新删掉都是删掉这个点) if (nonLazyLoad.size() == 0) { - // TODO chunk data read operation (c): get all data points for (ChunkSuit4CPV chunkSuit4CPV : candidateSet) { - if (chunkSuit4CPV.getBatchData() == null) { - currentChunkList.remove(chunkSuit4CPV); // TODO check this + // TODO 注意delete intervals的传递 + if (chunkSuit4CPV.getPageReader() == null) { List<IPageReader> pageReaderList = FileLoaderUtils.loadPageReaderList( chunkSuit4CPV.getChunkMetadata(), this.timeFilter); - for (IPageReader pageReader : pageReaderList) { // assume only one page in a chunk - ((PageReader) pageReader).split4CPV(startTime, endTime, interval, curStartTime, - currentChunkList, // add back into currentChunkList with loaded data - null, chunkSuit4CPV.getChunkMetadata()); - } - } else { // 已经load过,比如一开始被M4 interval分开,现在因为update而candidate失效 - updateBatchData(chunkSuit4CPV, tsDataType); + chunkSuit4CPV.setPageReader((PageReader) pageReaderList.get(0)); + } else { + // TODO 注意delete intervals的传递:主要是被重写点作为点删除传递 + chunkSuit4CPV.getPageReader() + .setDeleteIntervalList(chunkSuit4CPV.getChunkMetadata().getDeleteIntervalList()); } + // TODO chunk data read operation (c): get all data points + chunkSuit4CPV.getPageReader().updateBPTP(chunkSuit4CPV); + +// if (chunkSuit4CPV.getBatchData() == null) { +// currentChunkList.remove(chunkSuit4CPV); // TODO check this +// List<IPageReader> pageReaderList = FileLoaderUtils.loadPageReaderList( +// chunkSuit4CPV.getChunkMetadata(), this.timeFilter); +// for (IPageReader pageReader : pageReaderList) { // assume only one page in a chunk +// ((PageReader) pageReader).split4CPV(startTime, endTime, interval, curStartTime, +// currentChunkList, // add back into currentChunkList with loaded data +// null, chunkSuit4CPV.getChunkMetadata()); +// } +// } else { // 已经load过,比如一开始被M4 interval分开,现在因为update而candidate失效 +// updateBatchData(chunkSuit4CPV, tsDataType); +// } } break; // 退出循环2,进入循环1 } @@ -329,14 +341,15 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { MergeReaderPriority candidateVersion = new MergeReaderPriority( candidate.getChunkMetadata().getVersion(), candidate.getChunkMetadata().getOffsetOfChunkHeader()); - long candidateTimestamp = candidate.getChunkMetadata().getStatistics() - .getBottomTimestamp(); // TODO check - Object candidateValue = candidate.getChunkMetadata().getStatistics() - .getMinValue(); // TODO check + long candidateTimestamp = candidate.getStatistics().getBottomTimestamp(); // TODO check + Object candidateValue = candidate.getStatistics().getMinValue(); // TODO check // verify if this candidate point is deleted boolean isDeletedItself = false; - if (candidate.getChunkMetadata().getDeleteIntervalList() != null) { + // TODO add M4 interval virtual delete since BP/TP is not updated in getCurrentChunkListFromFutureChunkList + if (candidateTimestamp < curStartTime || candidateTimestamp >= curStartTime + interval) { + isDeletedItself = true; + } else if (candidate.getChunkMetadata().getDeleteIntervalList() != null) { for (TimeRange timeRange : candidate.getChunkMetadata().getDeleteIntervalList()) { if (timeRange.contains(candidateTimestamp)) { isDeletedItself = true; @@ -352,7 +365,7 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { continue; // 回到循环2 } else { // 否被删除 - + boolean isUpdate = false; // 找出所有更高版本的overlap它的块 List<ChunkSuit4CPV> overlaps = new ArrayList<>(); for (ChunkSuit4CPV chunkSuit4CPV : currentChunkList) { @@ -362,79 +375,90 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { if (version.compareTo(candidateVersion) <= 0) { // including bottomChunkMetadata continue; } - if (candidateTimestamp < chunkMetadata.getStartTime() - || candidateTimestamp > chunkMetadata.getEndTime()) { + if (candidateTimestamp < chunkSuit4CPV.getStatistics().getStartTime() + || candidateTimestamp > chunkSuit4CPV.getStatistics().getEndTime()) { continue; } + if (candidateTimestamp == chunkSuit4CPV.getStatistics().getStartTime() + || candidateTimestamp == chunkSuit4CPV.getStatistics().getEndTime()) { + isUpdate = true; + // this case does not need to execute chunk data read operation (a), + // because definitely overwrite + break; + } overlaps.add(chunkSuit4CPV); } - if (overlaps.size() == 0) { // 否被overlap,则当前candidate point就是计算结果,结束 + if (!isUpdate && overlaps.size() == 0) { // 否被overlap,则当前candidate point就是计算结果,结束 results.get(4) // TODO check: minTimestamp, maxTimestamp, firstValue, lastValue, // minValue[bottomTimestamp], maxValue[topTimestamp] .updateResultUsingValues(new long[]{candidateTimestamp}, 1, new Object[]{candidateValue}); // TODO check updateResult return; // 计算结束 - } else { // 是被overlap,则partial scan所有这些overlap的块 - // TODO chunk data read operation (a): check existence of data point at a timestamp - boolean isUpdate = false; + } else if (!isUpdate) { // 是被overlap,则partial scan所有这些overlap的块 for (ChunkSuit4CPV chunkSuit4CPV : overlaps) { // scan这个chunk的数据 - if (chunkSuit4CPV.getBatchData() == null) { + // TODO chunk data read operation (a): check existence of data point at a timestamp + if (chunkSuit4CPV.getPageReader() == null) { List<IPageReader> pageReaderList = FileLoaderUtils.loadPageReaderList( chunkSuit4CPV.getChunkMetadata(), this.timeFilter); - List<ChunkSuit4CPV> tmpCurrentChunkList = new ArrayList<>(); - for (IPageReader pageReader : pageReaderList) { // assume only one page in a chunk - isUpdate = ((PageReader) pageReader).partialScan( - candidateTimestamp); // TODO check - } - } else { - // 对已经加载的batchData进行partial scan,直到点的时间戳大于或等于candidateTimestamp - BatchDataIterator batchDataIterator = chunkSuit4CPV.getBatchData() - .getBatchDataIterator(); - while (batchDataIterator.hasNextTimeValuePair()) { - long timestamp = batchDataIterator.nextTimeValuePair().getTimestamp(); - if (timestamp > candidateTimestamp) { - break; - } - if (timestamp == candidateTimestamp) { - isUpdate = true; - break; - } - } - chunkSuit4CPV.getBatchData() - .resetBatchData(); // This step is necessary, because this BatchData may be - // accessed multiple times! + chunkSuit4CPV.setPageReader((PageReader) pageReaderList.get(0)); } + isUpdate = chunkSuit4CPV.checkIfExist(candidateTimestamp); +// if (chunkSuit4CPV.getPageReader() == null) { +// List<IPageReader> pageReaderList = FileLoaderUtils.loadPageReaderList( +// chunkSuit4CPV.getChunkMetadata(), this.timeFilter); +// for (IPageReader pageReader : pageReaderList) { // assume only one page in a chunk +// isUpdate = ((PageReader) pageReader).partialScan( +// candidateTimestamp); // TODO check +// } +// } else { +// // 对已经加载的batchData进行partial scan,直到点的时间戳大于或等于candidateTimestamp +// BatchDataIterator batchDataIterator = chunkSuit4CPV.getBatchData() +// .getBatchDataIterator(); +// while (batchDataIterator.hasNextTimeValuePair()) { +// long timestamp = batchDataIterator.nextTimeValuePair().getTimestamp(); +// if (timestamp > candidateTimestamp) { +// break; +// } +// if (timestamp == candidateTimestamp) { +// isUpdate = true; +// break; +// } +// } +// chunkSuit4CPV.getBatchData() +// .resetBatchData(); // This step is necessary, because this BatchData may be +// // accessed multiple times! +// } if (isUpdate) { // 提前结束对overlaps块的scan,因为已经找到一个update点证明candidate失效 break; } } - if (!isUpdate) { // partial scan了所有overlap的块都没有找到这样的点,则当前candidate point就是计算结果,结束 - results.get(4) // TODO check: minTimestamp, maxTimestamp, firstValue, lastValue, - // minValue[bottomTimestamp], maxValue[topTimestamp] - .updateResultUsingValues(new long[]{candidateTimestamp}, 1, - new Object[]{candidateValue}); - // TODO check updateResult - return; // 计算结束 - } else { // 找到这样的点,于是标记candidate point所在块为lazy - // load,并对其chunkMetadata的deleteInterval里加上对该点时间的删除,然后进入循环2 - if (candidate.getChunkMetadata().getDeleteIntervalList() == null) { - List<TimeRange> tmp = new ArrayList<>(); - tmp.add(new TimeRange(candidateTimestamp, candidateTimestamp)); - candidate.getChunkMetadata().setDeleteIntervalList(tmp); - } else { - candidate.getChunkMetadata().getDeleteIntervalList() - .add(new TimeRange(candidateTimestamp, candidateTimestamp)); // TODO check - } - // 删除那里不需要再加了,而这里更新就需要手动加一下删除操作 - nonLazyLoad.remove(candidate); - // TODO check this can really remove the element - // TODO check whether nonLazyLoad remove affects candidateSet - // TODO check nonLazyLoad sorted by version number from high to low - continue; // 回到循环2 + } + if (!isUpdate) { // partial scan了所有overlap的块都没有找到这样的点,则当前candidate point就是计算结果,结束 + results.get(4) // TODO check: minTimestamp, maxTimestamp, firstValue, lastValue, + // minValue[bottomTimestamp], maxValue[topTimestamp] + .updateResultUsingValues(new long[]{candidateTimestamp}, 1, + new Object[]{candidateValue}); + // TODO check updateResult + return; // 计算结束 + } else { // 找到这样的点,于是标记candidate point所在块为lazy + // load,并对其chunkMetadata的deleteInterval里加上对该点时间的删除,然后进入循环2 + if (candidate.getChunkMetadata().getDeleteIntervalList() == null) { + List<TimeRange> tmp = new ArrayList<>(); + tmp.add(new TimeRange(candidateTimestamp, candidateTimestamp)); + candidate.getChunkMetadata().setDeleteIntervalList(tmp); + } else { + candidate.getChunkMetadata().getDeleteIntervalList() + .add(new TimeRange(candidateTimestamp, candidateTimestamp)); // TODO check } + // 删除那里不需要再加了,而这里更新就需要手动加一下删除操作 + nonLazyLoad.remove(candidate); + // TODO check this can really remove the element + // TODO check whether nonLazyLoad remove affects candidateSet + // TODO check nonLazyLoad sorted by version number from high to low + continue; // 回到循环2 } } } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BinaryStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BinaryStatistics.java index 4d15179508..5336e89366 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BinaryStatistics.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BinaryStatistics.java @@ -46,6 +46,14 @@ public class BinaryStatistics extends Statistics<Binary> { return 4 + firstValue.getValues().length + 4 + lastValue.getValues().length; } + @Override + public void setMinInfo(MinMaxInfo minInfo) { + } + + @Override + public void setMaxInfo(MinMaxInfo maxInfo) { + } + /** * initialize Statistics. * diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BooleanStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BooleanStatistics.java index b751175cab..37782f1eda 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BooleanStatistics.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BooleanStatistics.java @@ -45,6 +45,13 @@ public class BooleanStatistics extends Statistics<Boolean> { return 10; } + @Override + public void setMinInfo(MinMaxInfo minInfo) { + } + + @Override + public void setMaxInfo(MinMaxInfo maxInfo) { + } /** * initialize boolean Statistics. * diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/DoubleStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/DoubleStatistics.java index a9d1faf191..8771ff089d 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/DoubleStatistics.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/DoubleStatistics.java @@ -46,6 +46,16 @@ public class DoubleStatistics extends Statistics<Double> { this.maxInfo = new MinMaxInfo<>(Double.MIN_VALUE, -1); } + @Override + public void setMinInfo(MinMaxInfo minInfo) { + this.minInfo = minInfo; + } + + @Override + public void setMaxInfo(MinMaxInfo maxInfo) { + this.maxInfo = maxInfo; + } + @Override public TSDataType getType() { return TSDataType.DOUBLE; @@ -155,7 +165,7 @@ public class DoubleStatistics extends Statistics<Double> { /** @author Yuyuan Kang */ @Override - void updateStats(double value, long timestamp) { + public void updateStats(double value, long timestamp) { if (this.isEmpty) { initializeStats(value, timestamp, value, timestamp, value, value, value); isEmpty = false; diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/FloatStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/FloatStatistics.java index 335d4b0dee..c79aafb0a9 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/FloatStatistics.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/FloatStatistics.java @@ -47,6 +47,16 @@ public class FloatStatistics extends Statistics<Float> { maxInfo = new MinMaxInfo<>(Float.MIN_VALUE, -1); } + @Override + public void setMinInfo(MinMaxInfo minInfo) { + this.minInfo = minInfo; + } + + @Override + public void setMaxInfo(MinMaxInfo maxInfo) { + this.maxInfo = maxInfo; + } + @Override public TSDataType getType() { return TSDataType.FLOAT; @@ -157,7 +167,7 @@ public class FloatStatistics extends Statistics<Float> { /** @author Yuyuan Kang */ @Override - void updateStats(float value, long timestamp) { + public void updateStats(float value, long timestamp) { if (this.isEmpty) { initializeStats(value, timestamp, value, timestamp, value, value, value); isEmpty = false; diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/IntegerStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/IntegerStatistics.java index 670ee5d604..3ab173fe60 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/IntegerStatistics.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/IntegerStatistics.java @@ -161,7 +161,7 @@ public class IntegerStatistics extends Statistics<Integer> { * @author Yuyuan Kang */ @Override - void updateStats(int value, long timestamp) { + public void updateStats(int value, long timestamp) { if (isEmpty) { initializeStats(value, timestamp, value, timestamp, value, value, value); isEmpty = false; @@ -170,6 +170,16 @@ public class IntegerStatistics extends Statistics<Integer> { } } + @Override + public void setMinInfo(MinMaxInfo minInfo) { + this.minInfo = minInfo; + } + + @Override + public void setMaxInfo(MinMaxInfo maxInfo) { + this.maxInfo = maxInfo; + } + /** * @author Yuyuan Kang */ diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/LongStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/LongStatistics.java index a9e2f4608c..e68f9beabc 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/LongStatistics.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/LongStatistics.java @@ -18,18 +18,19 @@ */ package org.apache.iotdb.tsfile.file.metadata.statistics; -import org.apache.iotdb.tsfile.exception.filter.StatisticsClassException; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; - import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; +import org.apache.iotdb.tsfile.exception.filter.StatisticsClassException; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; public class LongStatistics extends Statistics<Long> { - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ private MinMaxInfo<Long> minInfo; private MinMaxInfo<Long> maxInfo; @@ -45,13 +46,17 @@ public class LongStatistics extends Statistics<Long> { return TSDataType.INT64; } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ public LongStatistics() { this.minInfo = new MinMaxInfo<>(Long.MAX_VALUE, -1); this.maxInfo = new MinMaxInfo<>(Long.MIN_VALUE, -1); } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ @Override public int getStatsSize() { int len = 0; @@ -65,7 +70,9 @@ public class LongStatistics extends Statistics<Long> { return len; } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ public void initializeStats( MinMaxInfo<Long> minInfo, MinMaxInfo<Long> maxInfo, long firstValue, long last, double sum) { this.minInfo = new MinMaxInfo<>(minInfo); @@ -75,7 +82,9 @@ public class LongStatistics extends Statistics<Long> { this.sumValue += sum; } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ public void initializeStats( long min, long bottomTimestamp, @@ -91,7 +100,9 @@ public class LongStatistics extends Statistics<Long> { this.sumValue += sum; } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ private void updateStats( long minValue, long bottomTimestamp, @@ -105,7 +116,19 @@ public class LongStatistics extends Statistics<Long> { this.lastValue = lastValue; } - /** @author Yuyuan Kang */ + @Override + public void setMinInfo(MinMaxInfo minInfo) { + this.minInfo = minInfo; + } + + @Override + public void setMaxInfo(MinMaxInfo maxInfo) { + this.maxInfo = maxInfo; + } + + /** + * @author Yuyuan Kang + */ private void updateStats( MinMaxInfo<Long> minInfo, MinMaxInfo<Long> maxInfo, @@ -134,37 +157,49 @@ public class LongStatistics extends Statistics<Long> { // maxValue = BytesUtils.bytesToLong(maxBytes); // } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ @Override public MinMaxInfo<Long> getMinInfo() { return minInfo; } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ @Override public MinMaxInfo<Long> getMaxInfo() { return maxInfo; } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ @Override public Long getMinValue() { return this.minInfo.val; } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ @Override public Long getMaxValue() { return this.maxInfo.val; } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ @Override public long getBottomTimestamp() { return this.minInfo.timestamp; } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ @Override public long getTopTimestamp() { return this.maxInfo.timestamp; @@ -206,9 +241,11 @@ public class LongStatistics extends Statistics<Long> { throw new StatisticsClassException("Long statistics does not support: long sum"); } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ @Override - void updateStats(long value, long timestamp) { + public void updateStats(long value, long timestamp) { if (isEmpty) { initializeStats(value, timestamp, value, timestamp, value, value, value); isEmpty = false; @@ -217,7 +254,9 @@ public class LongStatistics extends Statistics<Long> { } } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ @Override void updateStats(long[] values, long[] timestamps, int batchSize) { for (int i = 0; i < batchSize; i++) { @@ -225,7 +264,9 @@ public class LongStatistics extends Statistics<Long> { } } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ @Override public void updateStats(long minValue, long bottomTimestamp, long maxValue, long topTimestamp) { updateMinInfo(minValue, bottomTimestamp); @@ -237,7 +278,9 @@ public class LongStatistics extends Statistics<Long> { return LONG_STATISTICS_FIXED_RAM_SIZE; } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ @Override protected void mergeStatisticsValue(Statistics stats) { LongStatistics longStats = (LongStatistics) stats; @@ -261,7 +304,9 @@ public class LongStatistics extends Statistics<Long> { } } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ @Override public void updateMinInfo(Long val, long timestamp) { if (val < this.minInfo.val) { @@ -269,7 +314,9 @@ public class LongStatistics extends Statistics<Long> { } } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ @Override public void updateMaxInfo(Long val, long timestamp) { if (val > this.maxInfo.val) { @@ -277,7 +324,9 @@ public class LongStatistics extends Statistics<Long> { } } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ @Override public int serializeStats(OutputStream outputStream) throws IOException { int byteLen = 0; @@ -289,7 +338,9 @@ public class LongStatistics extends Statistics<Long> { return byteLen; } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ @Override public void deserialize(InputStream inputStream) throws IOException { this.minInfo = ReadWriteIOUtils.readMinMaxInfo(inputStream, minMaxDataType); @@ -299,7 +350,9 @@ public class LongStatistics extends Statistics<Long> { this.sumValue = ReadWriteIOUtils.readDouble(inputStream); } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ @Override public void deserialize(ByteBuffer byteBuffer) { this.minInfo = ReadWriteIOUtils.readMinMaxInfo(byteBuffer, minMaxDataType); @@ -309,7 +362,9 @@ public class LongStatistics extends Statistics<Long> { this.sumValue = ReadWriteIOUtils.readDouble(byteBuffer); } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ @Override public String toString() { return super.toString() diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java index 70747f2d69..2a19854f1c 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java @@ -159,6 +159,10 @@ public abstract class Statistics<T> { public abstract T getMinValue(); + public abstract void setMinInfo(MinMaxInfo minInfo); + + public abstract void setMaxInfo(MinMaxInfo maxInfo); + public abstract T getMaxValue(); /** @author Yuyuan Kang */ @@ -408,22 +412,22 @@ public abstract class Statistics<T> { } /** @author Yuyuan Kang */ - void updateStats(int value, long timestamp) { + public void updateStats(int value, long timestamp) { throw new UnsupportedOperationException(); } /** @author Yuyuan Kang */ - void updateStats(long value, long timestamp) { + public void updateStats(long value, long timestamp) { throw new UnsupportedOperationException(); } /** @author Yuyuan Kang */ - void updateStats(float value, long timestamp) { + public void updateStats(float value, long timestamp) { throw new UnsupportedOperationException(); } /** @author Yuyuan Kang */ - void updateStats(double value, long timestamp) { + public void updateStats(double value, long timestamp) { throw new UnsupportedOperationException(); } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java index 26d97a60be..03630eb011 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java @@ -37,6 +37,10 @@ public class ChunkSuit4CPV { public Statistics statistics; // includes FP/LP/BP/TP info, may be updated + // [startPos,endPos] definitely for curStartTime interval, thanks to split4CPV + public int startPos = -1; // the first point position, starting from 0 + public int endPos = -1; // the last point position, starting from 0 + // public long startTime; // statistics in chunkMetadata is not deepCopied, so store update here // // public long endTime; @@ -61,7 +65,7 @@ public class ChunkSuit4CPV { // public MinMaxInfo<Float> maxInfoFloat; // public MinMaxInfo<Double> maxInfoDouble; -// private BatchData batchData; // deprecated + private BatchData batchData; // deprecated private PageReader pageReader; // bears plain timeBuffer and valueBuffer @@ -81,6 +85,8 @@ public class ChunkSuit4CPV { } else { statistics = chunkMetadata.getStatistics(); } + this.startPos = 0; + this.endPos = chunkMetadata.getStatistics().getCount() - 1; } // public ChunkSuit4CPV(ChunkMetadata chunkMetadata, BatchData batchData) { @@ -99,6 +105,8 @@ public class ChunkSuit4CPV { } else { statistics = chunkMetadata.getStatistics(); } + this.startPos = 0; + this.endPos = chunkMetadata.getStatistics().getCount() - 1; } public Statistics getStatistics() { @@ -176,17 +184,17 @@ public class ChunkSuit4CPV { return chunkMetadata; } -// public BatchData getBatchData() { -// return batchData; -// } + public BatchData getBatchData() { + return batchData; + } public PageReader getPageReader() { return pageReader; } -// public void setBatchData(BatchData batchData) { -// this.batchData = batchData; -// } + public void setBatchData(BatchData batchData) { + this.batchData = batchData; + } public void setPageReader(PageReader pageReader) { this.pageReader = pageReader; @@ -226,9 +234,9 @@ public class ChunkSuit4CPV { * chunk. * * @param targetTimestamp must be within the chunk time range [startTime, endTime] - * @return the point with value and timestamp + * @return the position of the point, starting from 0 */ - public MinMaxInfo findTheClosetPointEqualOrAfter(long targetTimestamp) throws IOException { + public int updateFPwithTheClosetPointEqualOrAfter(long targetTimestamp) throws IOException { StepRegress stepRegress = chunkMetadata.getStatistics().getStepRegress(); // infer position starts from 1, so minus 1 here // TODO debug buffer.get(index) @@ -247,30 +255,38 @@ public class ChunkSuit4CPV { estimatedPos++; } // else equal } // else equal + this.startPos = estimatedPos; // note this // since we have constrained that targetTimestamp must be within the chunk time range [startTime, endTime], // we can definitely find such a point with the closet timestamp equal to or larger than the // given timestamp in the chunk. + long timestamp = pageReader.timeBuffer.getLong(estimatedPos * 8); + statistics.setStartTime(timestamp); switch (chunkMetadata.getDataType()) { // iotdb的int类型的plain编码用的是自制的不支持random access // case INT32: // return new MinMaxInfo(pageReader.valueBuffer.getInt(estimatedPos * 4), // pageReader.timeBuffer.getLong(estimatedPos * 8)); case INT64: - return new MinMaxInfo( - pageReader.valueBuffer.getLong(pageReader.timeBufferLength + estimatedPos * 8), - pageReader.timeBuffer.getLong(estimatedPos * 8)); + long longVal = pageReader.valueBuffer.getLong( + pageReader.timeBufferLength + estimatedPos * 8); + ((LongStatistics) statistics).setFirstValue(longVal); + break; case FLOAT: - return new MinMaxInfo( - pageReader.valueBuffer.getFloat(pageReader.timeBufferLength + estimatedPos * 4), - pageReader.timeBuffer.getLong(estimatedPos * 8)); + float floatVal = pageReader.valueBuffer.getFloat( + pageReader.timeBufferLength + estimatedPos * 4); + ((FloatStatistics) statistics).setFirstValue(floatVal); + break; case DOUBLE: - return new MinMaxInfo( - pageReader.valueBuffer.getDouble(pageReader.timeBufferLength + estimatedPos * 8), - pageReader.timeBuffer.getLong(estimatedPos * 8)); + double doubleVal = pageReader.valueBuffer.getDouble( + pageReader.timeBufferLength + estimatedPos * 8); + ((DoubleStatistics) statistics).setFirstValue(doubleVal); + break; default: throw new IOException("Unsupported data type!"); } + + return estimatedPos; } /** @@ -278,9 +294,9 @@ public class ChunkSuit4CPV { * chunk. * * @param targetTimestamp must be within the chunk time range [startTime, endTime] - * @return the point with value and timestamp + * @return the position of the point, starting from 0 */ - public MinMaxInfo findTheClosetPointEqualOrBefore(long targetTimestamp) throws IOException { + public int updateLPwithTheClosetPointEqualOrBefore(long targetTimestamp) throws IOException { StepRegress stepRegress = chunkMetadata.getStatistics().getStepRegress(); // infer position starts from 1, so minus 1 here int estimatedPos = (int) Math.round(stepRegress.infer(targetTimestamp)) - 1; @@ -298,30 +314,37 @@ public class ChunkSuit4CPV { estimatedPos--; } // else equal } // else equal + this.endPos = estimatedPos; // note this // since we have constrained that targetTimestamp must be within the chunk time range [startTime, endTime], // we can definitely find such a point with the closet timestamp equal to or smaller than the // given timestamp in the chunk. + long timestamp = pageReader.timeBuffer.getLong(estimatedPos * 8); + statistics.setEndTime(timestamp); switch (chunkMetadata.getDataType()) { // iotdb的int类型的plain编码用的是自制的不支持random access // case INT32: // return new MinMaxInfo(pageReader.valueBuffer.getInt(estimatedPos * 4), // pageReader.timeBuffer.getLong(estimatedPos * 8)); case INT64: - return new MinMaxInfo( - pageReader.valueBuffer.getLong(pageReader.timeBufferLength + estimatedPos * 8), - pageReader.timeBuffer.getLong(estimatedPos * 8)); + long longVal = pageReader.valueBuffer.getLong( + pageReader.timeBufferLength + estimatedPos * 8); + ((LongStatistics) statistics).setLastValue(longVal); + break; case FLOAT: - return new MinMaxInfo( - pageReader.valueBuffer.getFloat(pageReader.timeBufferLength + estimatedPos * 4), - pageReader.timeBuffer.getLong(estimatedPos * 8)); + float floatVal = pageReader.valueBuffer.getFloat( + pageReader.timeBufferLength + estimatedPos * 4); + ((FloatStatistics) statistics).setLastValue(floatVal); + break; case DOUBLE: - return new MinMaxInfo( - pageReader.valueBuffer.getDouble(pageReader.timeBufferLength + estimatedPos * 8), - pageReader.timeBuffer.getLong(estimatedPos * 8)); + double doubleVal = pageReader.valueBuffer.getDouble( + pageReader.timeBufferLength + estimatedPos * 8); + ((DoubleStatistics) statistics).setLastValue(doubleVal); + break; default: throw new IOException("Unsupported data type!"); } + return estimatedPos; } /** @@ -405,4 +428,46 @@ public class ChunkSuit4CPV { } } + public void updateBP(MinMaxInfo point) { + long timestamp = point.timestamp; + Object val = point.val; + switch (chunkMetadata.getDataType()) { + case INT32: + ((IntegerStatistics) statistics).setMinInfo(timestamp, (int) val); + break; + case INT64: + ((LongStatistics) statistics).setMinInfo(timestamp, (long) val); + break; + case FLOAT: + ((FloatStatistics) statistics).setMinInfo(timestamp, (float) val); + break; + case DOUBLE: + ((DoubleStatistics) statistics).setMinInfo(timestamp, (double) val); + break; + default: + break; + } + } + + public void updateTP(MinMaxInfo point) { + long timestamp = point.timestamp; + Object val = point.val; + switch (chunkMetadata.getDataType()) { + case INT32: + ((IntegerStatistics) statistics).setMaxInfo(timestamp, (int) val); + break; + case INT64: + ((LongStatistics) statistics).setMaxInfo(timestamp, (long) val); + break; + case FLOAT: + ((FloatStatistics) statistics).setMaxInfo(timestamp, (float) val); + break; + case DOUBLE: + ((DoubleStatistics) statistics).setMaxInfo(timestamp, (double) val); + break; + default: + break; + } + } + } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java index f6d307fb42..2a4e1f7e4c 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java @@ -157,7 +157,6 @@ public class ChunkReader implements IChunkReader { unCompressor.uncompress( compressedPageBody, 0, compressedPageBodyLength, uncompressedPageData, 0); } catch (Exception e) { - System.out.println("???"); throw new IOException( "Uncompress error! uncompress size: " + pageHeader.getUncompressedSize() diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java index 218aae9167..f2345d3a7d 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java @@ -28,7 +28,9 @@ import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.header.PageHeader; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.file.metadata.statistics.MinMaxInfo; +import org.apache.iotdb.tsfile.file.metadata.statistics.DoubleStatistics; +import org.apache.iotdb.tsfile.file.metadata.statistics.FloatStatistics; +import org.apache.iotdb.tsfile.file.metadata.statistics.LongStatistics; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.BatchData; import org.apache.iotdb.tsfile.read.common.BatchDataFactory; @@ -123,25 +125,29 @@ public class PageReader implements IPageReader { long rightEndExcluded = curStartTime + (n + 1) * interval; ChunkSuit4CPV chunkSuit4CPV = new ChunkSuit4CPV(chunkMetadata, this, true); // TODO update FP,LP with the help of stepRegress index. BP/TP not update here. - MinMaxInfo FP = null; // new FP - MinMaxInfo LP = null; // new LP +// MinMaxInfo FP = null; // new FP +// MinMaxInfo LP = null; // new LP + int FP_pos = -1; + int LP_pos = -1; if (leftEndIncluded > chunkSuit4CPV.statistics.getStartTime()) { - FP = chunkSuit4CPV.findTheClosetPointEqualOrAfter(leftEndIncluded); - chunkSuit4CPV.updateFP(FP); +// FP = chunkSuit4CPV.findTheClosetPointEqualOrAfter(leftEndIncluded); +// chunkSuit4CPV.updateFP(FP); + FP_pos = chunkSuit4CPV.updateFPwithTheClosetPointEqualOrAfter(leftEndIncluded); } if (rightEndExcluded <= chunkSuit4CPV.statistics.getEndTime()) { // -1 is because right end is excluded end - LP = chunkSuit4CPV.findTheClosetPointEqualOrBefore(rightEndExcluded - 1); - chunkSuit4CPV.updateLP(LP); + LP_pos = chunkSuit4CPV.updateLPwithTheClosetPointEqualOrBefore(rightEndExcluded - 1); +// chunkSuit4CPV.updateLP(LP); } - if (FP != null && LP != null && FP.timestamp > LP.timestamp) { + if (FP_pos != -1 && LP_pos != -1 && FP_pos > LP_pos) { // the chunk has no point in this span, do nothing continue; } else { // add this chunkSuit4CPV into currentChunkList or splitChunkList if (n == 0) { currentChunkList.add(chunkSuit4CPV); } else { - int idx = (int) Math.floor((FP.timestamp - startTime) * 1.0 / interval); // global index + int idx = (int) Math.floor((chunkSuit4CPV.statistics.getStartTime() - startTime) * 1.0 + / interval); // global index TODO debug this splitChunkList.computeIfAbsent(idx, k -> new ArrayList<>()); splitChunkList.get(idx).add(chunkSuit4CPV); } @@ -277,6 +283,60 @@ public class PageReader implements IPageReader { // } // } + public void updateBPTP(ChunkSuit4CPV chunkSuit4CPV) throws IOException { + Statistics statistics = null; + switch (dataType) { + case INT64: + statistics = new LongStatistics(); + break; + case FLOAT: + statistics = new FloatStatistics(); + break; + case DOUBLE: + statistics = new DoubleStatistics(); + break; + default: + break; + } + // [startPos,endPos] definitely for curStartTime interval, thanks to split4CPV + for (int pos = chunkSuit4CPV.startPos; pos <= chunkSuit4CPV.endPos; pos++) { + long timestamp = timeBuffer.getLong(pos * 8); + switch (dataType) { + case INT64: + long aLong = valueBuffer.getLong(timeBufferLength + pos * 8); + if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aLong))) { + // update statistics of chunkMetadata1 + statistics.updateStats(aLong, timestamp); //TODO DEBUG + // ATTENTION: do not use update() interface which will also update StepRegress! + // only updateStats, actually only need to update BP and TP + } + break; + case FLOAT: + float aFloat = valueBuffer.getFloat(timeBufferLength + pos * 8); + if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aFloat))) { + // update statistics of chunkMetadata1 + statistics.updateStats(aFloat, timestamp); + // ATTENTION: do not use update() interface which will also update StepRegress! + // only updateStats, actually only need to update BP and TP + } + break; + case DOUBLE: + double aDouble = valueBuffer.getDouble(timeBufferLength + pos * 8); + if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aDouble))) { + // update statistics of chunkMetadata1 + statistics.updateStats(aDouble, timestamp); + // ATTENTION: do not use update() interface which will also update StepRegress! + // only updateStats, actually only need to update BP and TP + } + break; + default: + throw new UnSupportedDataTypeException(String.valueOf(dataType)); + } + } + chunkSuit4CPV.statistics.setMinInfo(statistics.getMinInfo()); + chunkSuit4CPV.statistics.setMaxInfo(statistics.getMaxInfo()); + } + /** * chunk里点时间戳从小到大递增, 所以遍历直到点的时间戳大于或等于candidateTimestamp即可结束 *
