This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/1.3.0-binxin in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 169b9de6010191ee650d53edaf038353dc2607e6 Author: JackieTien97 <[email protected]> AuthorDate: Fri Jan 12 19:07:29 2024 +0800 add timeset in AlignedChunkReader --- .../read/reader/chunk/AlignedChunkReader.java | 10 +- .../tsfile/read/reader/page/AlignedPageReader.java | 220 +++++++-------------- 2 files changed, 78 insertions(+), 152 deletions(-) diff --git a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/AlignedChunkReader.java b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/AlignedChunkReader.java index bbbb5fc8864..7edc0c48a89 100644 --- a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/AlignedChunkReader.java +++ b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/AlignedChunkReader.java @@ -40,8 +40,10 @@ import org.apache.iotdb.tsfile.read.reader.page.AlignedPageReader; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Set; public class AlignedChunkReader implements IChunkReader { @@ -67,6 +69,8 @@ public class AlignedChunkReader implements IChunkReader { /** A list of deleted intervals. */ private final List<List<TimeRange>> valueDeleteIntervalList; + private final Set<Long> timeSet = new HashSet<>(); + /** * Constructor of ChunkReader without deserializing chunk into page. This is used for fast * compaction. @@ -271,7 +275,8 @@ public class AlignedChunkReader implements IChunkReader { valueDataTypeList, valueDecoderList, filter, - queryAllSensors); + queryAllSensors, + timeSet); alignedPageReader.setDeleteIntervalList(valueDeleteIntervalList); return alignedPageReader; } @@ -320,7 +325,8 @@ public class AlignedChunkReader implements IChunkReader { valueTypes, valueDecoders, null, - false); + false, + new HashSet<>()); alignedPageReader.initTsBlockBuilder(valueTypes); alignedPageReader.setDeleteIntervalList(valueDeleteIntervalList); return alignedPageReader.getLazyPointReader(); diff --git a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java index 24c85ea7398..dbbfc0b364b 100644 --- a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java +++ b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java @@ -42,6 +42,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Set; import static org.apache.iotdb.tsfile.read.reader.series.PaginationController.UNLIMITED_PAGINATION_CONTROLLER; @@ -63,6 +64,8 @@ public class AlignedPageReader implements IPageReader, IAlignedPageReader { private boolean isModified; private TsBlockBuilder builder; + private final Set<Long> timeSet; + private static final int MASK = 0x80; @SuppressWarnings("squid:S107") @@ -75,7 +78,8 @@ public class AlignedPageReader implements IPageReader, IAlignedPageReader { List<TSDataType> valueDataTypeList, List<Decoder> valueDecoderList, Filter filter, - boolean queryAllSensors) { + boolean queryAllSensors, + Set<Long> timeSet) { timePageReader = new TimePageReader(timePageHeader, timePageData, timeDecoder); isModified = timePageReader.isModified(); valuePageReaderList = new ArrayList<>(valuePageHeaderList.size()); @@ -96,6 +100,7 @@ public class AlignedPageReader implements IPageReader, IAlignedPageReader { this.filter = filter; this.valueCount = valuePageReaderList.size(); this.queryAllSensors = queryAllSensors; + this.timeSet = timeSet; } @Override @@ -190,174 +195,89 @@ public class AlignedPageReader implements IPageReader, IAlignedPageReader { long[] timeBatch = timePageReader.getNextTimeBatch(); - if (canGoFastWay()) { - // all page data satisfy - if (filter == null || filter.allSatisfy(getTimeStatistics())) { - // skip all the page - if (paginationController.hasCurOffset(timeBatch.length)) { - paginationController.consumeOffset(timeBatch.length); - } else { - int readStartIndex = - paginationController.hasCurOffset() ? (int) paginationController.getCurOffset() : 0; - // consume the remaining offset - paginationController.consumeOffset(readStartIndex); - // not included - int readEndIndex = - (paginationController.hasCurLimit() && paginationController.getCurLimit() > 0) - && (paginationController.getCurLimit() - < timeBatch.length - readStartIndex + 1) - ? readStartIndex + (int) paginationController.getCurLimit() - : timeBatch.length; - if (paginationController.hasCurLimit() && paginationController.getCurLimit() > 0) { - paginationController.consumeLimit((long) readEndIndex - readStartIndex); - } - - // construct time column - for (int i = readStartIndex; i < readEndIndex; i++) { - builder.getTimeColumnBuilder().writeLong(timeBatch[i]); - builder.declarePosition(); - } - - // construct value columns - for (int i = 0; i < valueCount; i++) { - ValuePageReader pageReader = valuePageReaderList.get(i); - if (pageReader != null) { - pageReader.writeColumnBuilderWithNextBatch( - readStartIndex, readEndIndex, builder.getColumnBuilder(i)); - } else { - builder.getColumnBuilder(i).appendNull(readEndIndex - readStartIndex); - } - } + // if all the sub sensors' value are null in current row, just discard it + // if !filter.satisfy, discard this row + boolean[] keepCurrentRow = new boolean[timeBatch.length]; + for (int i = 0, n = timeBatch.length; i < n; i++) { + keepCurrentRow[i] = timeSet.add(timeBatch[i]); + } + if (filter != null) { + for (int i = 0, n = timeBatch.length; i < n; i++) { + if (keepCurrentRow[i]) { + keepCurrentRow[i] = filter.satisfy(timeBatch[i], null); } - } else { + } + } - // if all the sub sensors' value are null in current row, just discard it - // if !filter.satisfy, discard this row - boolean[] keepCurrentRow = new boolean[timeBatch.length]; - if (filter == null) { - Arrays.fill(keepCurrentRow, true); - } else { - for (int i = 0, n = timeBatch.length; i < n; i++) { - keepCurrentRow[i] = filter.satisfy(timeBatch[i], null); - } - } + boolean[][] isDeleted = null; + if (valueCount != 0) { + // using bitMap in valuePageReaders to indicate whether columns of current row are all null. + byte[] bitmask = new byte[(timeBatch.length - 1) / 8 + 1]; + Arrays.fill(bitmask, (byte) 0x00); + isDeleted = new boolean[valueCount][timeBatch.length]; + for (int columnIndex = 0; columnIndex < valueCount; columnIndex++) { + ValuePageReader pageReader = valuePageReaderList.get(columnIndex); + if (pageReader != null) { + byte[] bitmap = pageReader.getBitmap(); + pageReader.fillIsDeleted(timeBatch, isDeleted[columnIndex]); - // construct time column - int readEndIndex = timeBatch.length; - for (int i = 0; i < timeBatch.length; i++) { - if (keepCurrentRow[i]) { - if (paginationController.hasCurOffset()) { - paginationController.consumeOffset(); - keepCurrentRow[i] = false; - } else if (paginationController.hasCurLimit()) { - builder.getTimeColumnBuilder().writeLong(timeBatch[i]); - builder.declarePosition(); - paginationController.consumeLimit(); - } else { - readEndIndex = i; - break; + for (int i = 0, n = isDeleted[columnIndex].length; i < n; i++) { + if (isDeleted[columnIndex][i]) { + int shift = i % 8; + bitmap[i / 8] = (byte) (bitmap[i / 8] & (~(MASK >>> shift))); } } - } - - // construct value columns - for (int i = 0; i < valueCount; i++) { - ValuePageReader pageReader = valuePageReaderList.get(i); - if (pageReader != null) { - pageReader.writeColumnBuilderWithNextBatch( - readEndIndex, builder.getColumnBuilder(i), keepCurrentRow); - } else { - for (int j = 0; j < readEndIndex; j++) { - if (keepCurrentRow[j]) { - builder.getColumnBuilder(i).appendNull(); - } - } + for (int i = 0, n = bitmask.length; i < n; i++) { + bitmask[i] = (byte) (bitmap[i] | bitmask[i]); } } } - } else { - // if all the sub sensors' value are null in current row, just discard it - // if !filter.satisfy, discard this row - boolean[] keepCurrentRow = new boolean[timeBatch.length]; - if (filter == null) { - Arrays.fill(keepCurrentRow, true); - } else { - for (int i = 0, n = timeBatch.length; i < n; i++) { - keepCurrentRow[i] = filter.satisfy(timeBatch[i], null); - } - } - - boolean[][] isDeleted = null; - if (valueCount != 0) { - // using bitMap in valuePageReaders to indicate whether columns of current row are all null. - byte[] bitmask = new byte[(timeBatch.length - 1) / 8 + 1]; - Arrays.fill(bitmask, (byte) 0x00); - isDeleted = new boolean[valueCount][timeBatch.length]; - for (int columnIndex = 0; columnIndex < valueCount; columnIndex++) { - ValuePageReader pageReader = valuePageReaderList.get(columnIndex); - if (pageReader != null) { - byte[] bitmap = pageReader.getBitmap(); - pageReader.fillIsDeleted(timeBatch, isDeleted[columnIndex]); - - for (int i = 0, n = isDeleted[columnIndex].length; i < n; i++) { - if (isDeleted[columnIndex][i]) { - int shift = i % 8; - bitmap[i / 8] = (byte) (bitmap[i / 8] & (~(MASK >>> shift))); - } - } - for (int i = 0, n = bitmask.length; i < n; i++) { - bitmask[i] = (byte) (bitmap[i] | bitmask[i]); - } + for (int i = 0, n = bitmask.length; i < n; i++) { + if (bitmask[i] == (byte) 0xFF) { + // 8 rows are not all null, do nothing + } else if (bitmask[i] == (byte) 0x00) { + for (int j = 0; j < 8 && (i * 8 + j < keepCurrentRow.length); j++) { + keepCurrentRow[i * 8 + j] = false; } - } - - for (int i = 0, n = bitmask.length; i < n; i++) { - if (bitmask[i] == (byte) 0xFF) { - // 8 rows are not all null, do nothing - } else if (bitmask[i] == (byte) 0x00) { - for (int j = 0; j < 8 && (i * 8 + j < keepCurrentRow.length); j++) { + } else { + for (int j = 0; j < 8 && (i * 8 + j < keepCurrentRow.length); j++) { + if (((bitmask[i] & 0xFF) & (MASK >>> j)) == 0) { keepCurrentRow[i * 8 + j] = false; } - } else { - for (int j = 0; j < 8 && (i * 8 + j < keepCurrentRow.length); j++) { - if (((bitmask[i] & 0xFF) & (MASK >>> j)) == 0) { - keepCurrentRow[i * 8 + j] = false; - } - } } } } + } - // construct time column - int readEndIndex = timeBatch.length; - for (int i = 0; i < timeBatch.length; i++) { - if (keepCurrentRow[i]) { - if (paginationController.hasCurOffset()) { - paginationController.consumeOffset(); - keepCurrentRow[i] = false; - } else if (paginationController.hasCurLimit()) { - builder.getTimeColumnBuilder().writeLong(timeBatch[i]); - builder.declarePosition(); - paginationController.consumeLimit(); - } else { - readEndIndex = i; - break; - } + // construct time column + int readEndIndex = timeBatch.length; + for (int i = 0; i < timeBatch.length; i++) { + if (keepCurrentRow[i]) { + if (paginationController.hasCurOffset()) { + paginationController.consumeOffset(); + keepCurrentRow[i] = false; + } else if (paginationController.hasCurLimit()) { + builder.getTimeColumnBuilder().writeLong(timeBatch[i]); + builder.declarePosition(); + paginationController.consumeLimit(); + } else { + readEndIndex = i; + break; } } + } - // construct value columns - for (int i = 0; i < valueCount; i++) { - ValuePageReader pageReader = valuePageReaderList.get(i); - if (pageReader != null) { - pageReader.writeColumnBuilderWithNextBatch( - readEndIndex, builder.getColumnBuilder(i), keepCurrentRow, isDeleted[i]); - } else { - for (int j = 0; j < readEndIndex; j++) { - if (keepCurrentRow[j]) { - builder.getColumnBuilder(i).appendNull(); - } + // construct value columns + for (int i = 0; i < valueCount; i++) { + ValuePageReader pageReader = valuePageReaderList.get(i); + if (pageReader != null) { + pageReader.writeColumnBuilderWithNextBatch( + readEndIndex, builder.getColumnBuilder(i), keepCurrentRow, isDeleted[i]); + } else { + for (int j = 0; j < readEndIndex; j++) { + if (keepCurrentRow[j]) { + builder.getColumnBuilder(i).appendNull(); } } }
