This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/fixMergeReader in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 850e12daf33af818a7e13efd7163fce34ad5c7f9 Author: Minghui Liu <[email protected]> AuthorDate: Thu Feb 22 17:02:52 2024 +0800 fix merge reader bug --- .../execution/operator/source/SeriesScanUtil.java | 38 ++++++--------- .../tsfile/read/common/block/TsBlockUtil.java | 55 ++++++++++++++++++++++ .../tsfile/read/reader/page/AlignedPageReader.java | 54 +++------------------ 3 files changed, 75 insertions(+), 72 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java index 77375d41fac..cd1dc52e575 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java @@ -40,6 +40,7 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; +import org.apache.iotdb.tsfile.read.common.block.TsBlockUtil; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.reader.IPageReader; import org.apache.iotdb.tsfile.read.reader.IPointReader; @@ -643,7 +644,9 @@ public class SeriesScanUtil { if (hasCachedNextOverlappedPage) { hasCachedNextOverlappedPage = false; - TsBlock res = cachedTsBlock; + TsBlock res = + applyPushDownFilterAndLimitOffset( + cachedTsBlock, scanOptions.getPushDownFilter(), paginationController); cachedTsBlock = null; // cached tsblock has handled by pagination controller & push down filter, return directly @@ -672,6 +675,15 @@ public class SeriesScanUtil { } } + private TsBlock applyPushDownFilterAndLimitOffset( + TsBlock tsBlock, Filter pushDownFilter, PaginationController paginationController) { + if (pushDownFilter == null) { + return paginationController.applyTsBlock(tsBlock); + } + return TsBlockUtil.applyFilterAndLimitOffsetToTsBlock( + tsBlock, new TsBlockBuilder(getTsDataTypeList()), pushDownFilter, paginationController); + } + private void filterFirstPageReader() { if (firstPageReader == null) { return; @@ -708,7 +720,6 @@ public class SeriesScanUtil { @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning private boolean hasNextOverlappedPage() throws IOException { long startTime = System.nanoTime(); - Filter pushDownFilter = scanOptions.getPushDownFilter(); try { if (hasCachedNextOverlappedPage) { return true; @@ -810,9 +821,7 @@ public class SeriesScanUtil { // get the latest first point in mergeReader timeValuePair = mergeReader.nextTimeValuePair(); - if (processFilterAndPagination(timeValuePair, pushDownFilter, builder)) { - break; - } + addTimeValuePairToResult(timeValuePair, builder); } hasCachedNextOverlappedPage = !builder.isEmpty(); cachedTsBlock = builder.build(); @@ -875,25 +884,6 @@ public class SeriesScanUtil { unpackAllOverlappedUnseqPageReadersToMergeReader(currentPageEndpointTime); } - private boolean processFilterAndPagination( - TimeValuePair timeValuePair, Filter pushDownFilter, TsBlockBuilder builder) { - if (pushDownFilter != null - && !pushDownFilter.satisfyRow(timeValuePair.getTimestamp(), timeValuePair.getValues())) { - return false; - } - if (paginationController.hasCurOffset()) { - paginationController.consumeOffset(); - return false; - } - if (paginationController.hasCurLimit()) { - addTimeValuePairToResult(timeValuePair, builder); - paginationController.consumeLimit(); - return false; - } else { - return true; - } - } - private void addTimeValuePairToResult(TimeValuePair timeValuePair, TsBlockBuilder builder) { builder.getTimeColumnBuilder().writeLong(timeValuePair.getTimestamp()); switch (dataType) { diff --git a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockUtil.java b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockUtil.java index 8e10bcea023..3bf472088a7 100644 --- a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockUtil.java +++ b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockUtil.java @@ -21,6 +21,8 @@ package org.apache.iotdb.tsfile.read.common.block; import org.apache.iotdb.tsfile.read.common.TimeRange; import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn; +import org.apache.iotdb.tsfile.read.filter.basic.Filter; +import org.apache.iotdb.tsfile.read.reader.series.PaginationController; public class TsBlockUtil { @@ -65,4 +67,57 @@ public class TsBlockUtil { } return left; } + + public static TsBlock applyFilterAndLimitOffsetToTsBlock( + TsBlock unFilteredBlock, + TsBlockBuilder builder, + Filter pushDownFilter, + PaginationController paginationController) { + boolean[] keepCurrentRow = pushDownFilter.satisfyTsBlock(unFilteredBlock); + + // construct time column + int readEndIndex = + buildTimeColumnWithPagination( + unFilteredBlock, builder, keepCurrentRow, paginationController); + + // construct value columns + for (int i = 0; i < builder.getValueColumnBuilders().length; i++) { + for (int rowIndex = 0; rowIndex < readEndIndex; rowIndex++) { + if (keepCurrentRow[rowIndex]) { + if (unFilteredBlock.getValueColumns()[i].isNull(rowIndex)) { + builder.getColumnBuilder(i).appendNull(); + } else { + builder + .getColumnBuilder(i) + .writeObject(unFilteredBlock.getValueColumns()[i].getObject(rowIndex)); + } + } + } + } + return builder.build(); + } + + private static int buildTimeColumnWithPagination( + TsBlock unFilteredBlock, + TsBlockBuilder builder, + boolean[] keepCurrentRow, + PaginationController paginationController) { + int readEndIndex = unFilteredBlock.getPositionCount(); + for (int rowIndex = 0; rowIndex < readEndIndex; rowIndex++) { + if (keepCurrentRow[rowIndex]) { + if (paginationController.hasCurOffset()) { + paginationController.consumeOffset(); + keepCurrentRow[rowIndex] = false; + } else if (paginationController.hasCurLimit()) { + builder.getTimeColumnBuilder().writeLong(unFilteredBlock.getTimeByIndex(rowIndex)); + builder.declarePosition(); + paginationController.consumeLimit(); + } else { + readEndIndex = rowIndex; + break; + } + } + } + return readEndIndex; + } } diff --git a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java index b32a64cfdb9..c24b76798e4 100644 --- a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java +++ b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java @@ -28,6 +28,7 @@ import org.apache.iotdb.tsfile.read.common.BatchDataFactory; import org.apache.iotdb.tsfile.read.common.TimeRange; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; +import org.apache.iotdb.tsfile.read.common.block.TsBlockUtil; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.reader.IPageReader; import org.apache.iotdb.tsfile.read.reader.IPointReader; @@ -205,11 +206,14 @@ public class AlignedPageReader implements IPageReader { // construct value columns buildValueColumns(readEndIndex, keepCurrentRow, isDeleted); + TsBlock unFilteredBlock = builder.build(); if (pushDownFilterAllSatisfy) { // OFFSET & LIMIT has been consumed in buildTimeColumn - return builder.build(); + return unFilteredBlock; } - return applyPushDownFilter(); + builder.reset(); + return TsBlockUtil.applyFilterAndLimitOffsetToTsBlock( + unFilteredBlock, builder, pushDownFilter, paginationController); } private void buildResultWithoutAnyFilterAndDelete(long[] timeBatch) { @@ -279,26 +283,6 @@ public class AlignedPageReader implements IPageReader { return readEndIndex; } - private int buildTimeColumnWithPagination(TsBlock unFilteredBlock, boolean[] keepCurrentRow) { - int readEndIndex = unFilteredBlock.getPositionCount(); - for (int rowIndex = 0; rowIndex < readEndIndex; rowIndex++) { - if (keepCurrentRow[rowIndex]) { - if (paginationController.hasCurOffset()) { - paginationController.consumeOffset(); - keepCurrentRow[rowIndex] = false; - } else if (paginationController.hasCurLimit()) { - builder.getTimeColumnBuilder().writeLong(unFilteredBlock.getTimeByIndex(rowIndex)); - builder.declarePosition(); - paginationController.consumeLimit(); - } else { - readEndIndex = rowIndex; - break; - } - } - } - return readEndIndex; - } - private int buildTimeColumnWithoutPagination(long[] timeBatch, boolean[] keepCurrentRow) { int readEndIndex = 0; for (int i = 0; i < timeBatch.length; i++) { @@ -386,32 +370,6 @@ public class AlignedPageReader implements IPageReader { } } - private TsBlock applyPushDownFilter() { - TsBlock unFilteredBlock = builder.build(); - builder.reset(); - - boolean[] keepCurrentRow = pushDownFilter.satisfyTsBlock(unFilteredBlock); - - // construct time column - int readEndIndex = buildTimeColumnWithPagination(unFilteredBlock, keepCurrentRow); - - // construct value columns - for (int i = 0; i < valueCount; i++) { - for (int rowIndex = 0; rowIndex < readEndIndex; rowIndex++) { - if (keepCurrentRow[rowIndex]) { - if (unFilteredBlock.getValueColumns()[i].isNull(rowIndex)) { - builder.getColumnBuilder(i).appendNull(); - } else { - builder - .getColumnBuilder(i) - .writeObject(unFilteredBlock.getValueColumns()[i].getObject(rowIndex)); - } - } - } - } - return builder.build(); - } - public void setDeleteIntervalList(List<List<TimeRange>> list) { for (int i = 0; i < valueCount; i++) { if (valuePageReaderList.get(i) != null) {
